diff --git a/src/ChartExtractor/extraction/blood_pressure_and_heart_rate.py b/src/ChartExtractor/extraction/blood_pressure_and_heart_rate.py index 98f7b7c..1829b07 100644 --- a/src/ChartExtractor/extraction/blood_pressure_and_heart_rate.py +++ b/src/ChartExtractor/extraction/blood_pressure_and_heart_rate.py @@ -9,100 +9,96 @@ from ..utilities.detections import Detection -def find_timestamp(time_legend: List[Cluster], keypoint_x: float) -> str: +def find_timestamp(legend: Dict[str, Tuple[float, float]], keypoint_x: float) -> str: """Given a keypoint on a blood pressure or heart rate detection, finds the timestamp. Args: - `time_legend` (List[Cluster]): - The named clusters which form the timestamp legend that runs horizontally on the top - side of the blood pressure and heart rate section. + `legend` (Dict[str, Tuple[float, float]]): + The dictionary that maps the name of legend entries to their locations on the image. `keypoint_x` (float): The x value of the keypoint. Returns: The label of the closest timestamp cluster. """ - time_legend_centers: Dict[str, float] = { - clust.label: clust.bounding_box.center[0] for clust in time_legend + time_legend: Dict[str, Tuple[float, float]] = { + k:v for (k, v) in legend.items() if "_mins" in k } distances: Dict[str, float] = { - name: abs(legend_loc - keypoint_x) - for (name, legend_loc) in time_legend_centers.items() + name: abs(legend_loc[0] - keypoint_x) + for (name, legend_loc) in time_legend.items() } return min(distances, key=distances.get) -def find_value(value_legend: List[Cluster], keypoint_y: float) -> int: +def find_value(legend: Dict[str, Tuple[float, float]], keypoint_y: float) -> int: """Given a keypoint on a blood pressure or heart rate detection, finds the in mmhg/bpm value. Finds the closest two legend values, then uses the distance between the detection and both of the closest values to find an approximate value in between. Args: - `value_legend` (List[Cluster]): - The named clusters which form the mmhg/bpm legend that runs vertically on the left - side of the blood pressure and heart rate section. + `legend` (Dict[str, Tuple[float, float]]): + The dictionary that maps the name of legend entries to their locations on the image. `keypoint_y` (float): The y value of the keypoint. Returns: The approximate value that the keypoint encodes in mmhg/bpm. """ - value_legend_centers: Dict[str, float] = { - clust.label: clust.bounding_box.center[1] for clust in value_legend + value_legend: Dict[str, float] = { + k:v for (k, v) in legend.items() if "_mmhg" in k } distances: Dict[str, float] = { - name: abs(legend_loc - keypoint_y) - for (name, legend_loc) in value_legend_centers.items() + name: abs(legend_loc[1] - keypoint_y) + for (name, legend_loc) in value_legend.items() } first_closest: str = min(distances, key=distances.get) distances.pop(first_closest) second_closest: str = min(distances, key=distances.get) total_dist: float = abs( - value_legend_centers[first_closest] - value_legend_centers[second_closest] + value_legend[first_closest][1] - value_legend[second_closest][1] ) smaller_of_two_values = min( [first_closest, second_closest], key=lambda leg: int(leg.split("_")[0]) ) fractional_component = ( - abs(value_legend_centers[smaller_of_two_values] - keypoint_y) / total_dist + abs(value_legend[smaller_of_two_values][1] - keypoint_y) / total_dist ) * 10 return int(smaller_of_two_values.split("_")[0]) + int(fractional_component) def extract_heart_rate_and_blood_pressure( detections: List[Detection], - time_clusters: List[Cluster], - value_clusters: List[Cluster], + legend: Dict[str, Tuple[float, float]], ) -> Dict[str, Dict[str, str]]: """Extracts the heart rate and blood pressure data from the detections. Args: `detections` (List[Detection]): The keypoint detections of the systolic, diastolic, and heart rate markings. - `time_clusters` (List[Cluster]): - The clusters corresponding to the timestamps. - `value_clusters` (List[Cluster]): - The clusters corresponding to the mmhg and bpm values. + `legend` (Dict[str, Tuple[float, float]]): + The dictionary that maps the name of legend entries to their locations on the image. Returns: A dictionary mapping each timestamp to the systolic, diastolic, and heart rate reading that was recorded at that time. """ - + def filter_detections_outside_bp_and_hr_area(detections): + leftmost_point: float = min([point[0] for point in legend.values()]) + topmost_point: float = min([point[1] for point in legend.values()]) + rightmost_point: float = max([point[0] for point in legend.values()]) + bottommost_point: float = max([point[1] for point in legend.values()]) + return list( filter( lambda d: all( [ - d.annotation.bottom - > min(vc.bounding_box.top for vc in value_clusters), - d.annotation.top - < max(vc.bounding_box.bottom for vc in value_clusters), - d.annotation.left - > min(tc.bounding_box.left for tc in time_clusters), - d.annotation.right - < max(tc.bounding_box.right for tc in time_clusters), + d.annotation.bottom > topmost_point, + d.annotation.top < bottommost_point, + d.annotation.right > leftmost_point, + d.annotation.left < rightmost_point, ] ), detections, @@ -115,7 +111,7 @@ def filter_detections_outside_bp_and_hr_area(detections): for det in detections: point: Tuple[float, float] = det.annotation.keypoint category: str = det.annotation.category - timestamp: str = find_timestamp(time_clusters, point.x) + timestamp: str = find_timestamp(legend, point.x) if data.get(timestamp) is None: data[timestamp] = {category: det} elif data[timestamp].get(category) is None: @@ -129,6 +125,6 @@ def filter_detections_outside_bp_and_hr_area(detections): for category in data[timestamp].keys(): point: Tuple[float, float] = data[timestamp][category].annotation.keypoint suffix: str = "bpm" if category == "heart_rate" else "mmhg" - value: int = find_value(value_clusters, point.y) + value: int = find_value(legend, point.y) data[timestamp][category] = f"{value}_{suffix}" return data diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index b681350..f3fbd7a 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -18,6 +18,7 @@ detect_objects_using_tiling, label_studio_to_bboxes, ) +from ..extraction.find_legend import find_legend from ..extraction.inhaled_volatile import extract_inhaled_volatile from ..extraction.intraoperative_digit_boxes import ( extract_drug_codes, @@ -110,7 +111,14 @@ / MODEL_CONFIG["checkboxes"]["name"].replace(".onnx", ".json"), MODEL_CONFIG["checkboxes"]["imgsz"], MODEL_CONFIG["checkboxes"]["imgsz"], - lazy_loading=True + lazy_loading=True, +) +LEGEND_MODEL = OnnxYolov11Detection( + PATH_TO_MODELS / MODEL_CONFIG["whole_number_legend"]["name"], + PATH_TO_MODEL_METADATA / MODEL_CONFIG["whole_number_legend"]["name"].replace(".onnx", ".json"), + MODEL_CONFIG["whole_number_legend"]["imgsz"], + MODEL_CONFIG["whole_number_legend"]["imgsz"], + lazy_loading=True, ) @@ -222,12 +230,12 @@ def run_intraoperative_models(intraop_image: Image.Image) -> Dict[str, List[Dete ) # checkboxes - tile_size = compute_tile_size(MODEL_CONFIG["checkboxes"], intraop_image.size) + ckbx_tile_size = compute_tile_size(MODEL_CONFIG["checkboxes"], intraop_image.size) detections_dict["checkboxes"] = detect_objects_using_tiling( intraop_image, CHECKBOXES_MODEL, - tile_size, - tile_size, + ckbx_tile_size, + ckbx_tile_size, MODEL_CONFIG["checkboxes"]["horz_overlap_proportion"], MODEL_CONFIG["checkboxes"]["vert_overlap_proportion"], nms_threshold=0.8, @@ -270,6 +278,19 @@ def run_intraoperative_models(intraop_image: Image.Image) -> Dict[str, List[Dete MODEL_CONFIG["heart_rate"]["vert_overlap_proportion"], ) + # legend + legend_tile_size: int = compute_tile_size( + MODEL_CONFIG["whole_number_legend"], intraop_image.size + ) + detections_dict["legend"] = detect_objects_using_tiling( + intraop_image.copy(), + LEGEND_MODEL, + legend_tile_size, + legend_tile_size, + MODEL_CONFIG["whole_number_legend"]["horz_overlap_proportion"], + MODEL_CONFIG["whole_number_legend"]["vert_overlap_proportion"], + ) + return detections_dict @@ -403,27 +424,21 @@ def assign_meaning_to_intraoperative_detections( extracted_data["codes"] = extract_drug_codes( corrected_detections_dict["numbers"], *image_size ) - extracted_data["timing"] = extract_surgical_timing( + extracted_data["intraoperative_timing"] = extract_surgical_timing( corrected_detections_dict["numbers"], *image_size ) extracted_data["ett_size"] = extract_ett_size( corrected_detections_dict["numbers"], *image_size ) - - # extract inhaled volatile drugs - time_boxes, mmhg_boxes = isolate_blood_pressure_legend_bounding_boxes( - [det.annotation for det in corrected_detections_dict["landmarks"]], *image_size - ) - time_clusters: List[Cluster] = cluster_boxes( - time_boxes, cluster_kmeans, "mins", possible_nclusters=[40, 41, 42] - ) - mmhg_clusters: List[Cluster] = cluster_boxes( - mmhg_boxes, cluster_kmeans, "mmhg", possible_nclusters=[18, 19, 20] + + # get legend locations + legend_locations: Dict[str, Tuple[float, float]] = find_legend( + intraop_detections_dict["legend"], + image_size[0], + image_size[1], ) - legend_locations: Dict[str, Tuple[float, float]] = find_legend_locations( - time_clusters + mmhg_clusters - ) + # extract inhaled volatile drugs extracted_data["inhaled_volatile"] = extract_inhaled_volatile( corrected_detections_dict["numbers"], legend_locations, @@ -443,8 +458,7 @@ def assign_meaning_to_intraoperative_detections( extracted_data["bp_and_hr"] = extract_heart_rate_and_blood_pressure( bp_and_hr_dets, - time_clusters, - mmhg_clusters, + legend_locations, ) # extract physiological indicators @@ -565,24 +579,27 @@ def digitize_intraop_record(image: Image.Image) -> Dict: # extract drug code and surgical timing codes: Dict = {"codes": extract_drug_codes(digit_detections, *image.size)} - times: Dict = {"timing": extract_surgical_timing(digit_detections, *image.size)} + times: Dict = {"intraoperative_timing": extract_surgical_timing(digit_detections, *image.size)} ett_size: Dict = {"ett_size": extract_ett_size(digit_detections, *image.size)} - - # extract inhaled volatile drugs - time_boxes, mmhg_boxes = isolate_blood_pressure_legend_bounding_boxes( - [det.annotation for det in document_landmark_detections], *image.size + + # get legend locations + legend_tile_size: int = compute_tile_size( + MODEL_CONFIG["whole_number_legend"], image.size ) - time_clusters: List[Cluster] = cluster_boxes( - time_boxes, cluster_kmeans, "mins", possible_nclusters=[40, 41, 42] - ) - mmhg_clusters: List[Cluster] = cluster_boxes( - mmhg_boxes, cluster_kmeans, "mmhg", possible_nclusters=[18, 19, 20] + legend_detections = detect_objects_using_tiling( + image, + LEGEND_MODEL, + legend_tile_size, + legend_tile_size, + MODEL_CONFIG["whole_number_legend"]["horz_overlap_proportion"], + MODEL_CONFIG["whole_number_legend"]["vert_overlap_proportion"], ) - - legend_locations: Dict[str, Tuple[float, float]] = find_legend_locations( - time_clusters + mmhg_clusters + legend_locations: Dict[str, Tuple[float, float]] = find_legend( + legend_detections, + *image.size, ) - + + # extract inhaled volatile drugs inhaled_volatile: Dict = { "inhaled_volatile": extract_inhaled_volatile( digit_detections, legend_locations, document_landmark_detections @@ -591,7 +608,7 @@ def digitize_intraop_record(image: Image.Image) -> Dict: # extract bp and hr bp_and_hr: Dict = { - "bp_and_hr": make_bp_and_hr_detections(image, time_clusters, mmhg_clusters) + "bp_and_hr": make_bp_and_hr_detections(image, legend_locations) } # extract physiological indicators @@ -888,18 +905,15 @@ def compute_tile_size(model_config: Dict, image_size: Tuple[int, int]) -> int: def make_bp_and_hr_detections( image: Image.Image, - time_clusters: List[Cluster], - mmhg_clusters: List[Cluster], + legend: Dict[str, Tuple[float, float]] ) -> Dict: """Finds blood pressure symbols and associates a value and timestamp to them. Args: `image` (Image.Image): The image to detect on. - `time_clusters` (List[Cluster]): - A list of Cluster objects encoding the location of the time legend. - `mmhg_clusters` (List[Cluster]): - A list of Cluster objects encoding the location of the mmhg/bpm legend. + `legend` (Dict[str, Tuple[float, float]]): + The dictionary that maps the name of legend entries to their locations on the image. Returns: A dictionary mapping timestamps to values for systolic, diastolic, and heart rate. @@ -934,9 +948,7 @@ def make_bp_and_hr_detections( ) dets: List[Detection] = sys_dets + dia_dets + hr_dets - bp_and_hr = extract_heart_rate_and_blood_pressure( - dets, time_clusters, mmhg_clusters - ) + bp_and_hr = extract_heart_rate_and_blood_pressure(dets, legend) return bp_and_hr diff --git a/src/ChartExtractor/extraction/find_legend.py b/src/ChartExtractor/extraction/find_legend.py new file mode 100644 index 0000000..51ba11d --- /dev/null +++ b/src/ChartExtractor/extraction/find_legend.py @@ -0,0 +1,376 @@ +"""A module which converts legend detections into (x, y) coordinates of legend entries.""" + +# Built-in imports +from itertools import pairwise +from typing import Dict, List, Tuple +import warnings + +# Internal imports +from ..utilities.annotations import BoundingBox +from ..utilities.detections import Detection + +# External imports +import numpy as np +from scipy.stats import gaussian_kde + + +def find_legend( + legend_detections: List[Detection], + image_width: int, + image_height: int, +) -> Dict[str, Tuple[float, float]]: + """Finds the location of the legend. + + The legend has two components. The first is timing which runs across the page left to right, + and the second is the mmhg/bpm which runs along the page top to bottom. This function + determines the location of each part of the legend and returns a dictionary. + + Args: + legend_detections (List[Detection]): + The homography-corrected legend detections. + image_width (int): + The image's width. + image_height (int): + The image's height. + + Returns: + A dictionary whose keys are the name of the legend entry ("X_mmhg" for mmhg/bpm entries and + "Y_mins" for time entries), and whose values are the normalized location of that legend + marking. + """ + bboxes: List[BoundingBox] = [det.annotation for det in legend_detections] + time_bboxes, mmhg_bboxes = __separate_mmhg_and_timing_detections( + bboxes, + image_height, + image_width, + ) + + legend_locations: Dict[str, Tuple[float, float]] = dict() + legend_locations.update(__convert_mmhg_bboxes_to_legend_locations(mmhg_bboxes)) + legend_locations.update(__convert_time_bboxes_to_legend_locations(time_bboxes)) + + return legend_locations + + +def __separate_mmhg_and_timing_detections( + legend_bounding_boxes: List[BoundingBox], + image_height: int, + image_width: int, +) -> Tuple[List[BoundingBox], List[BoundingBox]]: + """Separates the timing detections from the mmhg detections. + + Args: + legend_bounding_boxes (List[Detection]): + The homography-corrected legend detections. + image_height (int): + The image's height. + image_width (int): + The image's width. + + Returns: + A tuple containing the (timing detections, mmhg detections). + """ + bboxes: List[BoundingBox] = list( + filter( + lambda bb: 0.2 * image_height < bb.center[1] < 0.8 * image_height, + legend_bounding_boxes, + ) + ) + + # x_loc and y_loc form the point at the top left corner of the bp and hr section. + x_loc: int = __find_density_max([bb.left for bb in bboxes], image_width) + y_loc: int = __find_density_max([bb.top for bb in bboxes], image_height) + + # heuristics to determine if the box is a time box or mmhg box. + def is_time_box(box: BoundingBox): + return abs(box.center[0] - x_loc) > abs(box.center[1] - y_loc) + + def is_mmhg_box(box: BoundingBox): + return abs(box.center[0] - x_loc) < abs(box.center[1] - y_loc) + + time_bboxes: List[BoundingBox] = list(filter(is_time_box, bboxes)) + mmhg_bboxes: List[BoundingBox] = list(filter(is_mmhg_box, bboxes)) + + # Return a tuple of bounding boxes in the top-right and bottom-left regions + return time_bboxes, mmhg_bboxes + + +def __find_density_max(values: List[int], search_area: int) -> int: + """Given a list of values and a search area, find the index of where the highest density is. + + The list of values correspond to identifying points for the bounding boxes and the search + area corresponds to the images height or width. + + Args: + `values` (List[int]): + List of identifying points for the bounding boxes + `search_area` (int): + height/width of the image dependent on whether x or y axis is being searched. + + Returns: + The axis value that has the highest density of bounding boxes. + """ + kde = gaussian_kde(values, bw_method=0.2) + values = np.linspace(0, search_area, 10000) + kde_vals = kde(values) + max_index = np.argmax(kde_vals) + return values[max_index] + + +def __convert_mmhg_bboxes_to_legend_locations( + mmhg_bounding_boxes: List[BoundingBox], +) -> Dict[str, Tuple[float, float]]: + """Attempts to convert the mmhg bounding boxes into pixel locations of the legend. + + Args: + mmhg_bounding_boxes (List[BoundingBox]): + The bounding boxes that encode the mmhg/bpm locations. + + Returns: + A dictionary mapping the names of the mmhg/bpm legend entries (X_bpm) to (x, y) coordinates + on the image. + + Raises: + ValueError: + If the function cannot resolve an issue caused by there being too many detections, too + few detections, or too many mislabeled detections. + """ + if len(mmhg_bounding_boxes) < 19: + raise ValueError( + f"Legend detection found too few legend entries for mmhg: {len(mmhg_bounding_boxes)}" + ) + if len(mmhg_bounding_boxes) > 21: + raise ValueError( + f"Legend detection found too many legend entries for mmhg: {len(mmhg_bounding_boxes)}" + ) + + mmhg_legend_locations: Dict[str, Tuple[float, float]] = dict() + mmhg_bounding_boxes: List[BoundingBox] = sorted( + mmhg_bounding_boxes, key=lambda bb: bb.center[1], reverse=True + ) + median_y_distance: float = np.median( + [ + (bb_0.center[1] - bb_1.center[1]) + for (bb_0, bb_1) in pairwise(mmhg_bounding_boxes) + ] + ) + + for ix, mmhg_bbox in enumerate(mmhg_bounding_boxes): + is_first_box: bool = ix == 0 + is_last_box: bool = ix == len(mmhg_bounding_boxes) - 1 + + if is_first_box and mmhg_bounding_boxes[0].category != "30": + warnings.warn( + "An anomaly was detected in the mmhg bboxes. Attempting to fix." + ) + if ( + mmhg_bounding_boxes[0].category == "40" + and mmhg_bounding_boxes[1].category == "50" + ): + mmhg_legend_locations["30_mmhg"] = ( + mmhg_bounding_boxes[0].center[0], + mmhg_bounding_boxes[0].center[1] + median_y_distance, + ) + mmhg_legend_locations["40_mmhg"] = mmhg_bbox.center + continue + else: + raise ValueError("Irrecoverable anomaly in mmhg bbox detection.") + elif is_last_box and mmhg_bounding_boxes[-1].category != "220": + warnings.warn( + "An anomaly was detected in the mmhg bboxes. Attempting to fix." + ) + if ( + mmhg_bounding_boxes[-1].category == "210" + and mmhg_bounding_boxes[-2].category == "200" + ): + mmhg_legend_locations["210_mmhg"] = mmhg_bbox.center + mmhg_legend_locations["220_mmhg"] = ( + mmhg_bounding_boxes[-1].center[0], + mmhg_bounding_boxes[-1].center[1] - median_y_distance, + ) + continue + else: + raise ValueError("Irrecoverable anomaly in mmhg bbox detection.") + elif all( + [ + not is_first_box, + not is_last_box, + int(mmhg_bbox.category) - int(mmhg_bounding_boxes[ix - 1].category) + != 10, + ] + ): + warnings.warn( + "An anomaly was detected in the mmhg bboxes. Attempting to fix." + ) + + previous_box_category: int = int(mmhg_bounding_boxes[ix - 1].category) + next_box_category: int = int(mmhg_bounding_boxes[ix + 1].category) + distance_to_previous_box: float = abs( + mmhg_bounding_boxes[ix - 1].center[1] - mmhg_bbox.center[1] + ) + box_is_mislabeled: bool = next_box_category - previous_box_category != 20 + # If the distance to the last box is more than 10 pixels off the median, its missing. + previous_box_is_missing: bool = distance_to_previous_box > 10 + # If the distance to the last box is less than 10 pixels off the median, its an + # extra box. + box_is_erroneous: bool = ( + distance_to_previous_box < (2 / 3) * median_y_distance + ) + if box_is_erroneous: + pass + elif previous_box_is_missing: + imputed_missing_box_center: Tuple[float, float] = ( + (0.5) + * (mmhg_bbox.center[0] + mmhg_bounding_boxes[ix - 1].center[0]), + (0.5) + * (mmhg_bbox.center[1] + mmhg_bounding_boxes[ix - 1].center[1]), + ) + imputed_missing_box_label: int = int( + 0.5 * (previous_box_category + int(mmhg_bbox.category)) + ) + mmhg_legend_locations[f"{imputed_missing_box_label}_mmhg"] = ( + imputed_missing_box_center + ) + elif box_is_mislabeled: + imputed_label = int(0.5 * (next_box_category + previous_box_category)) + mmhg_legend_locations[f"{imputed_label}_mmhg"] = mmhg_bbox.center + continue + else: + raise ValueError("Irrecoverable anomaly in mmhg box detection.") + mmhg_legend_locations[f"{mmhg_bbox.category}_mmhg"] = mmhg_bbox.center + return mmhg_legend_locations + + +def __convert_time_bboxes_to_legend_locations( + time_bounding_boxes: List[BoundingBox], +) -> Dict[str, Tuple[float, float]]: + """Attempts to convert the time bounding boxes into pixel locations of the legend. + + Args: + time_bounding_boxes (List[BoundingBox]): + The bounding boxes that encode the time locations. + + Returns: + A dictionary mapping the names of the time legend entries (X_mins) to (x, y) coordinates + on the image. + + Raises: + ValueError: + If the function cannot resolve an issue caused by there being too many detections, too + few detections, or too many mislabeled detections. + """ + if len(time_bounding_boxes) < 41: + raise ValueError( + f"Legend detection found too few legend entries for time: {len(time_bounding_boxes)}" + ) + if len(time_bounding_boxes) > 43: + raise ValueError( + f"Legend detection found too many legend entries for time: {len(time_bounding_boxes)}" + ) + + time_legend_locations: Dict[str, Tuple[float, float]] = dict() + time_bounding_boxes: List[BoundingBox] = sorted( + time_bounding_boxes, + key=lambda bb: bb.center[0], + ) + median_x_distance: float = np.median( + [ + (bb_1.center[0] - bb_0.center[0]) + for (bb_0, bb_1) in pairwise(time_bounding_boxes) + ] + ) + + def timedelta(ix: int): + return (ix // 12) * 60 + + for ix, time_bbox in enumerate(time_bounding_boxes): + is_first_box: bool = ix == 0 + is_last_box: bool = ix == len(time_bounding_boxes) - 1 + time_gap_too_large = (int(time_bbox.category) + timedelta(ix)) - ( + int(time_bounding_boxes[ix - 1].category) + timedelta(ix - 1) + ) != 5 # There should only be 5 minutes between legend entries. + if is_first_box and time_bounding_boxes[0].category != "0": + warnings.warn( + "An anomaly was detected in the time bboxes. Attempting to fix." + ) + if ( + time_bounding_boxes[0].category == "5" + and time_bounding_boxes[1].category == "10" + ): + time_legend_locations["0_mins"] = ( + time_bounding_boxes[0].center[0] - median_x_distance, + time_bounding_boxes[0].center[1], + ) + time_legend_locations["5_mins"] = time_bbox.center + continue + else: + raise ValueError("Irrecoverable anomaly in time bbox detection.") + elif is_last_box and time_bounding_boxes[-1].category != "25": + warnings.warn( + "An anomaly was detected in the time bboxes. Attempting to fix." + ) + if ( + time_bounding_boxes[-1].category == "20" + and time_bounding_boxes[-2].category == "15" + ): + time_legend_locations["200_mins"] = time_bbox.center + time_legend_locations["205_mins"] = ( + time_bounding_boxes[-1].center[0] + median_x_distance, + time_bounding_boxes[-1].center[1], + ) + continue + else: + raise ValueError("Irrecoverable anomaly in time bbox detection.") + elif all([not is_first_box, not is_last_box, time_gap_too_large]): + warnings.warn( + "An anomaly was detected in the time bboxes. Attempting to fix." + ) + + previous_box_category: int = int( + time_bounding_boxes[ix - 1].category + ) + timedelta(ix - 1) + next_box_category: int = int( + time_bounding_boxes[ix + 1].category + ) + timedelta(ix + 1) + distance_to_previous_box: float = abs( + time_bounding_boxes[ix - 1].center[0] - time_bbox.center[0] + ) + box_is_mislabeled: bool = next_box_category - previous_box_category != 10 + # If the distance to the last box is more than 10 pixels off the median, its missing. + previous_box_is_missing: bool = ( + distance_to_previous_box - median_x_distance > 10 + ) + # If the distance to the last box is less than 10 pixels off the median, its an + # extra box. + box_is_erroneous: bool = ( + distance_to_previous_box < (2 / 3) * median_x_distance + ) + if box_is_erroneous: + pass + elif previous_box_is_missing: + imputed_missing_box_center: Tuple[float, float] = ( + (0.5) + * (time_bbox.center[0] + time_bounding_boxes[ix - 1].center[0]), + (0.5) + * (time_bbox.center[1] + time_bounding_boxes[ix - 1].center[1]), + ) + imputed_missing_box_label: int = int( + 0.5 + * (previous_box_category + int(time_bbox.category) + timedelta(ix)) + ) + time_legend_locations[ + f"{imputed_missing_box_label+timedelta(ix)}_mins" + ] = imputed_missing_box_center + elif box_is_mislabeled: + imputed_label = int(0.5 * (next_box_category + previous_box_category)) + time_legend_locations[f"{imputed_label+timedelta(ix)}_mins"] = ( + time_bbox.center + ) + continue + else: + raise ValueError("Irrecoverable anomaly in time box detection.") + time_legend_locations[f"{int(time_bbox.category)+timedelta(ix)}_mins"] = ( + time_bbox.center + ) + + return time_legend_locations diff --git a/src/ChartExtractor/extraction/intraoperative_digit_boxes.py b/src/ChartExtractor/extraction/intraoperative_digit_boxes.py index 45374ba..f0ccef1 100644 --- a/src/ChartExtractor/extraction/intraoperative_digit_boxes.py +++ b/src/ChartExtractor/extraction/intraoperative_digit_boxes.py @@ -143,7 +143,7 @@ def extract_surgical_timing( tens_place_val: Optional[int] = surgical_timing_values.get(prefix + "_tens") ones_place_val: Optional[int] = surgical_timing_values.get(prefix + "_ones") if None not in [tens_place_val, ones_place_val]: - surgical_timing[prefix] = str(tens_place_val.category) + str( - ones_place_val.category + surgical_timing[prefix] = str( + int(str(tens_place_val.category) + str(ones_place_val.category)) ) return surgical_timing diff --git a/src/ChartExtractor/extraction/preoperative_postoperative_digit_boxes.py b/src/ChartExtractor/extraction/preoperative_postoperative_digit_boxes.py index 603cf3c..7de6390 100644 --- a/src/ChartExtractor/extraction/preoperative_postoperative_digit_boxes.py +++ b/src/ChartExtractor/extraction/preoperative_postoperative_digit_boxes.py @@ -384,7 +384,11 @@ def extract_preop_postop_digit_data( A dictionary with all the preoperative and postoperative data. """ data: Dict[str, str] = dict() - data["timing"] = extract_time_of_assessment(number_detections, im_width, im_height) + data["preoperative_postoperative_timing"] = extract_time_of_assessment( + number_detections, + im_width, + im_height + ) data["age"] = extract_age(number_detections, im_width, im_height) data["height"] = extract_height(number_detections, im_width, im_height) data["weight"] = extract_weight(number_detections, im_width, im_height) diff --git a/src/ChartExtractor/utilities/detections.py b/src/ChartExtractor/utilities/detections.py index 63a807f..2275949 100644 --- a/src/ChartExtractor/utilities/detections.py +++ b/src/ChartExtractor/utilities/detections.py @@ -31,7 +31,10 @@ class Detection: confidence: float @staticmethod - def from_dict(detection_dict: Dict[str, Any], annotation_type: Union[BoundingBox, Keypoint]): + def from_dict( + detection_dict: Dict[str, Any], + annotation_type: Union[type[BoundingBox], type[Keypoint]] + ) -> "Detection": """Creates a `Detection` from a dictionary of data. Args: