From 85c05f430724a3d27a37ce5d4770085b0265dfd3 Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 8 Aug 2025 11:26:47 -0400 Subject: [PATCH 01/13] Copy over code from Jupyter notebook. --- src/ChartExtractor/extraction/find_legend.py | 376 +++++++++++++++++++ 1 file changed, 376 insertions(+) create mode 100644 src/ChartExtractor/extraction/find_legend.py diff --git a/src/ChartExtractor/extraction/find_legend.py b/src/ChartExtractor/extraction/find_legend.py new file mode 100644 index 0000000..51ba11d --- /dev/null +++ b/src/ChartExtractor/extraction/find_legend.py @@ -0,0 +1,376 @@ +"""A module which converts legend detections into (x, y) coordinates of legend entries.""" + +# Built-in imports +from itertools import pairwise +from typing import Dict, List, Tuple +import warnings + +# Internal imports +from ..utilities.annotations import BoundingBox +from ..utilities.detections import Detection + +# External imports +import numpy as np +from scipy.stats import gaussian_kde + + +def find_legend( + legend_detections: List[Detection], + image_width: int, + image_height: int, +) -> Dict[str, Tuple[float, float]]: + """Finds the location of the legend. + + The legend has two components. The first is timing which runs across the page left to right, + and the second is the mmhg/bpm which runs along the page top to bottom. This function + determines the location of each part of the legend and returns a dictionary. + + Args: + legend_detections (List[Detection]): + The homography-corrected legend detections. + image_width (int): + The image's width. + image_height (int): + The image's height. + + Returns: + A dictionary whose keys are the name of the legend entry ("X_mmhg" for mmhg/bpm entries and + "Y_mins" for time entries), and whose values are the normalized location of that legend + marking. + """ + bboxes: List[BoundingBox] = [det.annotation for det in legend_detections] + time_bboxes, mmhg_bboxes = __separate_mmhg_and_timing_detections( + bboxes, + image_height, + image_width, + ) + + legend_locations: Dict[str, Tuple[float, float]] = dict() + legend_locations.update(__convert_mmhg_bboxes_to_legend_locations(mmhg_bboxes)) + legend_locations.update(__convert_time_bboxes_to_legend_locations(time_bboxes)) + + return legend_locations + + +def __separate_mmhg_and_timing_detections( + legend_bounding_boxes: List[BoundingBox], + image_height: int, + image_width: int, +) -> Tuple[List[BoundingBox], List[BoundingBox]]: + """Separates the timing detections from the mmhg detections. + + Args: + legend_bounding_boxes (List[Detection]): + The homography-corrected legend detections. + image_height (int): + The image's height. + image_width (int): + The image's width. + + Returns: + A tuple containing the (timing detections, mmhg detections). + """ + bboxes: List[BoundingBox] = list( + filter( + lambda bb: 0.2 * image_height < bb.center[1] < 0.8 * image_height, + legend_bounding_boxes, + ) + ) + + # x_loc and y_loc form the point at the top left corner of the bp and hr section. + x_loc: int = __find_density_max([bb.left for bb in bboxes], image_width) + y_loc: int = __find_density_max([bb.top for bb in bboxes], image_height) + + # heuristics to determine if the box is a time box or mmhg box. + def is_time_box(box: BoundingBox): + return abs(box.center[0] - x_loc) > abs(box.center[1] - y_loc) + + def is_mmhg_box(box: BoundingBox): + return abs(box.center[0] - x_loc) < abs(box.center[1] - y_loc) + + time_bboxes: List[BoundingBox] = list(filter(is_time_box, bboxes)) + mmhg_bboxes: List[BoundingBox] = list(filter(is_mmhg_box, bboxes)) + + # Return a tuple of bounding boxes in the top-right and bottom-left regions + return time_bboxes, mmhg_bboxes + + +def __find_density_max(values: List[int], search_area: int) -> int: + """Given a list of values and a search area, find the index of where the highest density is. + + The list of values correspond to identifying points for the bounding boxes and the search + area corresponds to the images height or width. + + Args: + `values` (List[int]): + List of identifying points for the bounding boxes + `search_area` (int): + height/width of the image dependent on whether x or y axis is being searched. + + Returns: + The axis value that has the highest density of bounding boxes. + """ + kde = gaussian_kde(values, bw_method=0.2) + values = np.linspace(0, search_area, 10000) + kde_vals = kde(values) + max_index = np.argmax(kde_vals) + return values[max_index] + + +def __convert_mmhg_bboxes_to_legend_locations( + mmhg_bounding_boxes: List[BoundingBox], +) -> Dict[str, Tuple[float, float]]: + """Attempts to convert the mmhg bounding boxes into pixel locations of the legend. + + Args: + mmhg_bounding_boxes (List[BoundingBox]): + The bounding boxes that encode the mmhg/bpm locations. + + Returns: + A dictionary mapping the names of the mmhg/bpm legend entries (X_bpm) to (x, y) coordinates + on the image. + + Raises: + ValueError: + If the function cannot resolve an issue caused by there being too many detections, too + few detections, or too many mislabeled detections. + """ + if len(mmhg_bounding_boxes) < 19: + raise ValueError( + f"Legend detection found too few legend entries for mmhg: {len(mmhg_bounding_boxes)}" + ) + if len(mmhg_bounding_boxes) > 21: + raise ValueError( + f"Legend detection found too many legend entries for mmhg: {len(mmhg_bounding_boxes)}" + ) + + mmhg_legend_locations: Dict[str, Tuple[float, float]] = dict() + mmhg_bounding_boxes: List[BoundingBox] = sorted( + mmhg_bounding_boxes, key=lambda bb: bb.center[1], reverse=True + ) + median_y_distance: float = np.median( + [ + (bb_0.center[1] - bb_1.center[1]) + for (bb_0, bb_1) in pairwise(mmhg_bounding_boxes) + ] + ) + + for ix, mmhg_bbox in enumerate(mmhg_bounding_boxes): + is_first_box: bool = ix == 0 + is_last_box: bool = ix == len(mmhg_bounding_boxes) - 1 + + if is_first_box and mmhg_bounding_boxes[0].category != "30": + warnings.warn( + "An anomaly was detected in the mmhg bboxes. Attempting to fix." + ) + if ( + mmhg_bounding_boxes[0].category == "40" + and mmhg_bounding_boxes[1].category == "50" + ): + mmhg_legend_locations["30_mmhg"] = ( + mmhg_bounding_boxes[0].center[0], + mmhg_bounding_boxes[0].center[1] + median_y_distance, + ) + mmhg_legend_locations["40_mmhg"] = mmhg_bbox.center + continue + else: + raise ValueError("Irrecoverable anomaly in mmhg bbox detection.") + elif is_last_box and mmhg_bounding_boxes[-1].category != "220": + warnings.warn( + "An anomaly was detected in the mmhg bboxes. Attempting to fix." + ) + if ( + mmhg_bounding_boxes[-1].category == "210" + and mmhg_bounding_boxes[-2].category == "200" + ): + mmhg_legend_locations["210_mmhg"] = mmhg_bbox.center + mmhg_legend_locations["220_mmhg"] = ( + mmhg_bounding_boxes[-1].center[0], + mmhg_bounding_boxes[-1].center[1] - median_y_distance, + ) + continue + else: + raise ValueError("Irrecoverable anomaly in mmhg bbox detection.") + elif all( + [ + not is_first_box, + not is_last_box, + int(mmhg_bbox.category) - int(mmhg_bounding_boxes[ix - 1].category) + != 10, + ] + ): + warnings.warn( + "An anomaly was detected in the mmhg bboxes. Attempting to fix." + ) + + previous_box_category: int = int(mmhg_bounding_boxes[ix - 1].category) + next_box_category: int = int(mmhg_bounding_boxes[ix + 1].category) + distance_to_previous_box: float = abs( + mmhg_bounding_boxes[ix - 1].center[1] - mmhg_bbox.center[1] + ) + box_is_mislabeled: bool = next_box_category - previous_box_category != 20 + # If the distance to the last box is more than 10 pixels off the median, its missing. + previous_box_is_missing: bool = distance_to_previous_box > 10 + # If the distance to the last box is less than 10 pixels off the median, its an + # extra box. + box_is_erroneous: bool = ( + distance_to_previous_box < (2 / 3) * median_y_distance + ) + if box_is_erroneous: + pass + elif previous_box_is_missing: + imputed_missing_box_center: Tuple[float, float] = ( + (0.5) + * (mmhg_bbox.center[0] + mmhg_bounding_boxes[ix - 1].center[0]), + (0.5) + * (mmhg_bbox.center[1] + mmhg_bounding_boxes[ix - 1].center[1]), + ) + imputed_missing_box_label: int = int( + 0.5 * (previous_box_category + int(mmhg_bbox.category)) + ) + mmhg_legend_locations[f"{imputed_missing_box_label}_mmhg"] = ( + imputed_missing_box_center + ) + elif box_is_mislabeled: + imputed_label = int(0.5 * (next_box_category + previous_box_category)) + mmhg_legend_locations[f"{imputed_label}_mmhg"] = mmhg_bbox.center + continue + else: + raise ValueError("Irrecoverable anomaly in mmhg box detection.") + mmhg_legend_locations[f"{mmhg_bbox.category}_mmhg"] = mmhg_bbox.center + return mmhg_legend_locations + + +def __convert_time_bboxes_to_legend_locations( + time_bounding_boxes: List[BoundingBox], +) -> Dict[str, Tuple[float, float]]: + """Attempts to convert the time bounding boxes into pixel locations of the legend. + + Args: + time_bounding_boxes (List[BoundingBox]): + The bounding boxes that encode the time locations. + + Returns: + A dictionary mapping the names of the time legend entries (X_mins) to (x, y) coordinates + on the image. + + Raises: + ValueError: + If the function cannot resolve an issue caused by there being too many detections, too + few detections, or too many mislabeled detections. + """ + if len(time_bounding_boxes) < 41: + raise ValueError( + f"Legend detection found too few legend entries for time: {len(time_bounding_boxes)}" + ) + if len(time_bounding_boxes) > 43: + raise ValueError( + f"Legend detection found too many legend entries for time: {len(time_bounding_boxes)}" + ) + + time_legend_locations: Dict[str, Tuple[float, float]] = dict() + time_bounding_boxes: List[BoundingBox] = sorted( + time_bounding_boxes, + key=lambda bb: bb.center[0], + ) + median_x_distance: float = np.median( + [ + (bb_1.center[0] - bb_0.center[0]) + for (bb_0, bb_1) in pairwise(time_bounding_boxes) + ] + ) + + def timedelta(ix: int): + return (ix // 12) * 60 + + for ix, time_bbox in enumerate(time_bounding_boxes): + is_first_box: bool = ix == 0 + is_last_box: bool = ix == len(time_bounding_boxes) - 1 + time_gap_too_large = (int(time_bbox.category) + timedelta(ix)) - ( + int(time_bounding_boxes[ix - 1].category) + timedelta(ix - 1) + ) != 5 # There should only be 5 minutes between legend entries. + if is_first_box and time_bounding_boxes[0].category != "0": + warnings.warn( + "An anomaly was detected in the time bboxes. Attempting to fix." + ) + if ( + time_bounding_boxes[0].category == "5" + and time_bounding_boxes[1].category == "10" + ): + time_legend_locations["0_mins"] = ( + time_bounding_boxes[0].center[0] - median_x_distance, + time_bounding_boxes[0].center[1], + ) + time_legend_locations["5_mins"] = time_bbox.center + continue + else: + raise ValueError("Irrecoverable anomaly in time bbox detection.") + elif is_last_box and time_bounding_boxes[-1].category != "25": + warnings.warn( + "An anomaly was detected in the time bboxes. Attempting to fix." + ) + if ( + time_bounding_boxes[-1].category == "20" + and time_bounding_boxes[-2].category == "15" + ): + time_legend_locations["200_mins"] = time_bbox.center + time_legend_locations["205_mins"] = ( + time_bounding_boxes[-1].center[0] + median_x_distance, + time_bounding_boxes[-1].center[1], + ) + continue + else: + raise ValueError("Irrecoverable anomaly in time bbox detection.") + elif all([not is_first_box, not is_last_box, time_gap_too_large]): + warnings.warn( + "An anomaly was detected in the time bboxes. Attempting to fix." + ) + + previous_box_category: int = int( + time_bounding_boxes[ix - 1].category + ) + timedelta(ix - 1) + next_box_category: int = int( + time_bounding_boxes[ix + 1].category + ) + timedelta(ix + 1) + distance_to_previous_box: float = abs( + time_bounding_boxes[ix - 1].center[0] - time_bbox.center[0] + ) + box_is_mislabeled: bool = next_box_category - previous_box_category != 10 + # If the distance to the last box is more than 10 pixels off the median, its missing. + previous_box_is_missing: bool = ( + distance_to_previous_box - median_x_distance > 10 + ) + # If the distance to the last box is less than 10 pixels off the median, its an + # extra box. + box_is_erroneous: bool = ( + distance_to_previous_box < (2 / 3) * median_x_distance + ) + if box_is_erroneous: + pass + elif previous_box_is_missing: + imputed_missing_box_center: Tuple[float, float] = ( + (0.5) + * (time_bbox.center[0] + time_bounding_boxes[ix - 1].center[0]), + (0.5) + * (time_bbox.center[1] + time_bounding_boxes[ix - 1].center[1]), + ) + imputed_missing_box_label: int = int( + 0.5 + * (previous_box_category + int(time_bbox.category) + timedelta(ix)) + ) + time_legend_locations[ + f"{imputed_missing_box_label+timedelta(ix)}_mins" + ] = imputed_missing_box_center + elif box_is_mislabeled: + imputed_label = int(0.5 * (next_box_category + previous_box_category)) + time_legend_locations[f"{imputed_label+timedelta(ix)}_mins"] = ( + time_bbox.center + ) + continue + else: + raise ValueError("Irrecoverable anomaly in time box detection.") + time_legend_locations[f"{int(time_bbox.category)+timedelta(ix)}_mins"] = ( + time_bbox.center + ) + + return time_legend_locations From 21dd3954b512c230f7795f9886d611abac6a683f Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 8 Aug 2025 11:43:40 -0400 Subject: [PATCH 02/13] Add legend model to extraction.py. --- src/ChartExtractor/extraction/extraction.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index b681350..35fbe83 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -110,7 +110,14 @@ / MODEL_CONFIG["checkboxes"]["name"].replace(".onnx", ".json"), MODEL_CONFIG["checkboxes"]["imgsz"], MODEL_CONFIG["checkboxes"]["imgsz"], - lazy_loading=True + lazy_loading=True, +) +LEGEND_MODEL = OnnxYolov11Detection( + PATH_TO_MODELS / MODEL_CONFIG["whole_number_legend"]["name"], + PATH_TO_MODEL_METADATA / MODEL_CONFIG["whole_number_legend"]["name"].replace(".onnx", ".json"), + MODEL_CONFIG["whole_number_legend"]["imgsz"], + MODEL_CONFIG["whole_number_legend"]["imgsz"], + lazy_loading=True, ) From 30ecc575e2430e1510e5008d3882e55c6ae445f8 Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 8 Aug 2025 12:01:13 -0400 Subject: [PATCH 03/13] Run legend model now when all objects get detected. --- src/ChartExtractor/extraction/extraction.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index 35fbe83..030cc83 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -277,6 +277,19 @@ def run_intraoperative_models(intraop_image: Image.Image) -> Dict[str, List[Dete MODEL_CONFIG["heart_rate"]["vert_overlap_proportion"], ) + # legend + legend_tile_size: int = compute_tile_size( + MODEL_CONFIG["whole_number_legend"], intraop_image.size + ) + detections_dict["legend"] = detect_objects_using_tiling( + intraop_image.copy(), + LEGEND_MODEL, + legend_tile_size, + legend_tile_size, + MODEL_CONFIG["whole_number_legend"]["horz_overlap_proportion"], + MODEL_CONFIG["whole_number_legend"]["vert_overlap_proportion"], + ) + return detections_dict From 5f90f499b7b8b89edae5636fbf5b7f4d3678146d Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 8 Aug 2025 12:05:43 -0400 Subject: [PATCH 04/13] Switch over intraop interpretation function to use new legend code. --- src/ChartExtractor/extraction/extraction.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index 030cc83..e5096f3 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -18,6 +18,7 @@ detect_objects_using_tiling, label_studio_to_bboxes, ) +from ..extraction.find_legend import find_legend from ..extraction.inhaled_volatile import extract_inhaled_volatile from ..extraction.intraoperative_digit_boxes import ( extract_drug_codes, @@ -429,21 +430,14 @@ def assign_meaning_to_intraoperative_detections( extracted_data["ett_size"] = extract_ett_size( corrected_detections_dict["numbers"], *image_size ) - - # extract inhaled volatile drugs - time_boxes, mmhg_boxes = isolate_blood_pressure_legend_bounding_boxes( - [det.annotation for det in corrected_detections_dict["landmarks"]], *image_size - ) - time_clusters: List[Cluster] = cluster_boxes( - time_boxes, cluster_kmeans, "mins", possible_nclusters=[40, 41, 42] - ) - mmhg_clusters: List[Cluster] = cluster_boxes( - mmhg_boxes, cluster_kmeans, "mmhg", possible_nclusters=[18, 19, 20] + + # get legend locations + legend_locations: Dict[str, Tuple[float, float]] = find_legend( + intraop_detections_dict["legend"], + **image_size, ) - legend_locations: Dict[str, Tuple[float, float]] = find_legend_locations( - time_clusters + mmhg_clusters - ) + # extract inhaled volatile drugs extracted_data["inhaled_volatile"] = extract_inhaled_volatile( corrected_detections_dict["numbers"], legend_locations, From 2d7dd394fd35977920967ecb660e548fdd2c1da1 Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 8 Aug 2025 12:07:02 -0400 Subject: [PATCH 05/13] Switch over end-to-end digitzation function to use new legend code. --- src/ChartExtractor/extraction/extraction.py | 22 +++++++-------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index e5096f3..d4b3313 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -581,22 +581,14 @@ def digitize_intraop_record(image: Image.Image) -> Dict: codes: Dict = {"codes": extract_drug_codes(digit_detections, *image.size)} times: Dict = {"timing": extract_surgical_timing(digit_detections, *image.size)} ett_size: Dict = {"ett_size": extract_ett_size(digit_detections, *image.size)} - - # extract inhaled volatile drugs - time_boxes, mmhg_boxes = isolate_blood_pressure_legend_bounding_boxes( - [det.annotation for det in document_landmark_detections], *image.size - ) - time_clusters: List[Cluster] = cluster_boxes( - time_boxes, cluster_kmeans, "mins", possible_nclusters=[40, 41, 42] - ) - mmhg_clusters: List[Cluster] = cluster_boxes( - mmhg_boxes, cluster_kmeans, "mmhg", possible_nclusters=[18, 19, 20] - ) - - legend_locations: Dict[str, Tuple[float, float]] = find_legend_locations( - time_clusters + mmhg_clusters + + # get legend locations + legend_locations: Dict[str, Tuple[float, float]] = find_legend( + intraop_detections_dict["legend"], + **image_size, ) - + + # extract inhaled volatile drugs inhaled_volatile: Dict = { "inhaled_volatile": extract_inhaled_volatile( digit_detections, legend_locations, document_landmark_detections From 4521147bb5dbf1d499d5a6101c94f34264a3ed1c Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 8 Aug 2025 13:54:34 -0400 Subject: [PATCH 06/13] Change blood pressure and heart rate extraction to use the legend dict and not the clusters. --- .../blood_pressure_and_heart_rate.py | 66 +++++++++---------- 1 file changed, 31 insertions(+), 35 deletions(-) diff --git a/src/ChartExtractor/extraction/blood_pressure_and_heart_rate.py b/src/ChartExtractor/extraction/blood_pressure_and_heart_rate.py index 98f7b7c..1829b07 100644 --- a/src/ChartExtractor/extraction/blood_pressure_and_heart_rate.py +++ b/src/ChartExtractor/extraction/blood_pressure_and_heart_rate.py @@ -9,100 +9,96 @@ from ..utilities.detections import Detection -def find_timestamp(time_legend: List[Cluster], keypoint_x: float) -> str: +def find_timestamp(legend: Dict[str, Tuple[float, float]], keypoint_x: float) -> str: """Given a keypoint on a blood pressure or heart rate detection, finds the timestamp. Args: - `time_legend` (List[Cluster]): - The named clusters which form the timestamp legend that runs horizontally on the top - side of the blood pressure and heart rate section. + `legend` (Dict[str, Tuple[float, float]]): + The dictionary that maps the name of legend entries to their locations on the image. `keypoint_x` (float): The x value of the keypoint. Returns: The label of the closest timestamp cluster. """ - time_legend_centers: Dict[str, float] = { - clust.label: clust.bounding_box.center[0] for clust in time_legend + time_legend: Dict[str, Tuple[float, float]] = { + k:v for (k, v) in legend.items() if "_mins" in k } distances: Dict[str, float] = { - name: abs(legend_loc - keypoint_x) - for (name, legend_loc) in time_legend_centers.items() + name: abs(legend_loc[0] - keypoint_x) + for (name, legend_loc) in time_legend.items() } return min(distances, key=distances.get) -def find_value(value_legend: List[Cluster], keypoint_y: float) -> int: +def find_value(legend: Dict[str, Tuple[float, float]], keypoint_y: float) -> int: """Given a keypoint on a blood pressure or heart rate detection, finds the in mmhg/bpm value. Finds the closest two legend values, then uses the distance between the detection and both of the closest values to find an approximate value in between. Args: - `value_legend` (List[Cluster]): - The named clusters which form the mmhg/bpm legend that runs vertically on the left - side of the blood pressure and heart rate section. + `legend` (Dict[str, Tuple[float, float]]): + The dictionary that maps the name of legend entries to their locations on the image. `keypoint_y` (float): The y value of the keypoint. Returns: The approximate value that the keypoint encodes in mmhg/bpm. """ - value_legend_centers: Dict[str, float] = { - clust.label: clust.bounding_box.center[1] for clust in value_legend + value_legend: Dict[str, float] = { + k:v for (k, v) in legend.items() if "_mmhg" in k } distances: Dict[str, float] = { - name: abs(legend_loc - keypoint_y) - for (name, legend_loc) in value_legend_centers.items() + name: abs(legend_loc[1] - keypoint_y) + for (name, legend_loc) in value_legend.items() } first_closest: str = min(distances, key=distances.get) distances.pop(first_closest) second_closest: str = min(distances, key=distances.get) total_dist: float = abs( - value_legend_centers[first_closest] - value_legend_centers[second_closest] + value_legend[first_closest][1] - value_legend[second_closest][1] ) smaller_of_two_values = min( [first_closest, second_closest], key=lambda leg: int(leg.split("_")[0]) ) fractional_component = ( - abs(value_legend_centers[smaller_of_two_values] - keypoint_y) / total_dist + abs(value_legend[smaller_of_two_values][1] - keypoint_y) / total_dist ) * 10 return int(smaller_of_two_values.split("_")[0]) + int(fractional_component) def extract_heart_rate_and_blood_pressure( detections: List[Detection], - time_clusters: List[Cluster], - value_clusters: List[Cluster], + legend: Dict[str, Tuple[float, float]], ) -> Dict[str, Dict[str, str]]: """Extracts the heart rate and blood pressure data from the detections. Args: `detections` (List[Detection]): The keypoint detections of the systolic, diastolic, and heart rate markings. - `time_clusters` (List[Cluster]): - The clusters corresponding to the timestamps. - `value_clusters` (List[Cluster]): - The clusters corresponding to the mmhg and bpm values. + `legend` (Dict[str, Tuple[float, float]]): + The dictionary that maps the name of legend entries to their locations on the image. Returns: A dictionary mapping each timestamp to the systolic, diastolic, and heart rate reading that was recorded at that time. """ - + def filter_detections_outside_bp_and_hr_area(detections): + leftmost_point: float = min([point[0] for point in legend.values()]) + topmost_point: float = min([point[1] for point in legend.values()]) + rightmost_point: float = max([point[0] for point in legend.values()]) + bottommost_point: float = max([point[1] for point in legend.values()]) + return list( filter( lambda d: all( [ - d.annotation.bottom - > min(vc.bounding_box.top for vc in value_clusters), - d.annotation.top - < max(vc.bounding_box.bottom for vc in value_clusters), - d.annotation.left - > min(tc.bounding_box.left for tc in time_clusters), - d.annotation.right - < max(tc.bounding_box.right for tc in time_clusters), + d.annotation.bottom > topmost_point, + d.annotation.top < bottommost_point, + d.annotation.right > leftmost_point, + d.annotation.left < rightmost_point, ] ), detections, @@ -115,7 +111,7 @@ def filter_detections_outside_bp_and_hr_area(detections): for det in detections: point: Tuple[float, float] = det.annotation.keypoint category: str = det.annotation.category - timestamp: str = find_timestamp(time_clusters, point.x) + timestamp: str = find_timestamp(legend, point.x) if data.get(timestamp) is None: data[timestamp] = {category: det} elif data[timestamp].get(category) is None: @@ -129,6 +125,6 @@ def filter_detections_outside_bp_and_hr_area(detections): for category in data[timestamp].keys(): point: Tuple[float, float] = data[timestamp][category].annotation.keypoint suffix: str = "bpm" if category == "heart_rate" else "mmhg" - value: int = find_value(value_clusters, point.y) + value: int = find_value(legend, point.y) data[timestamp][category] = f"{value}_{suffix}" return data From c26bb1cdb84376768dd0a8bddf5e7ae665abcb27 Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 8 Aug 2025 13:55:16 -0400 Subject: [PATCH 07/13] Change bp and hr extraction to match changes. --- src/ChartExtractor/extraction/extraction.py | 27 +++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index d4b3313..8ae90ee 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -457,8 +457,7 @@ def assign_meaning_to_intraoperative_detections( extracted_data["bp_and_hr"] = extract_heart_rate_and_blood_pressure( bp_and_hr_dets, - time_clusters, - mmhg_clusters, + legend_locations, ) # extract physiological indicators @@ -583,9 +582,20 @@ def digitize_intraop_record(image: Image.Image) -> Dict: ett_size: Dict = {"ett_size": extract_ett_size(digit_detections, *image.size)} # get legend locations + legend_tile_size: int = compute_tile_size( + MODEL_CONFIG["whole_number_legend"], image.size + ) + legend_detections = detect_objects_using_tiling( + image, + LEGEND_MODEL, + legend_tile_size, + legend_tile_size, + MODEL_CONFIG["whole_number_legend"]["horz_overlap_proportion"], + MODEL_CONFIG["whole_number_legend"]["vert_overlap_proportion"], + ) legend_locations: Dict[str, Tuple[float, float]] = find_legend( - intraop_detections_dict["legend"], - **image_size, + legend_detections, + *image.size, ) # extract inhaled volatile drugs @@ -597,7 +607,7 @@ def digitize_intraop_record(image: Image.Image) -> Dict: # extract bp and hr bp_and_hr: Dict = { - "bp_and_hr": make_bp_and_hr_detections(image, time_clusters, mmhg_clusters) + "bp_and_hr": make_bp_and_hr_detections(image, legend_locations) } # extract physiological indicators @@ -894,8 +904,7 @@ def compute_tile_size(model_config: Dict, image_size: Tuple[int, int]) -> int: def make_bp_and_hr_detections( image: Image.Image, - time_clusters: List[Cluster], - mmhg_clusters: List[Cluster], + legend: Dict[str, Tuple[float, float]] ) -> Dict: """Finds blood pressure symbols and associates a value and timestamp to them. @@ -940,9 +949,7 @@ def make_bp_and_hr_detections( ) dets: List[Detection] = sys_dets + dia_dets + hr_dets - bp_and_hr = extract_heart_rate_and_blood_pressure( - dets, time_clusters, mmhg_clusters - ) + bp_and_hr = extract_heart_rate_and_blood_pressure(dets, legend) return bp_and_hr From 88297d8056e6e1c9f00bd365867707db01f571c8 Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 8 Aug 2025 13:58:12 -0400 Subject: [PATCH 08/13] Fix docstring. --- src/ChartExtractor/extraction/extraction.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index 8ae90ee..b9e104a 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -911,10 +911,8 @@ def make_bp_and_hr_detections( Args: `image` (Image.Image): The image to detect on. - `time_clusters` (List[Cluster]): - A list of Cluster objects encoding the location of the time legend. - `mmhg_clusters` (List[Cluster]): - A list of Cluster objects encoding the location of the mmhg/bpm legend. + `legend` (Dict[str, Tuple[float, float]]): + The dictionary that maps the name of legend entries to their locations on the image. Returns: A dictionary mapping timestamps to values for systolic, diastolic, and heart rate. From cf22e8e88ed916208ddd870634f953c9925cc7c7 Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Wed, 13 Aug 2025 15:22:09 -0400 Subject: [PATCH 09/13] Change the name of the checkbox tile size to match other variables in the function scope. --- src/ChartExtractor/extraction/extraction.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index b9e104a..ae62d76 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -230,12 +230,12 @@ def run_intraoperative_models(intraop_image: Image.Image) -> Dict[str, List[Dete ) # checkboxes - tile_size = compute_tile_size(MODEL_CONFIG["checkboxes"], intraop_image.size) + ckbx_tile_size = compute_tile_size(MODEL_CONFIG["checkboxes"], intraop_image.size) detections_dict["checkboxes"] = detect_objects_using_tiling( intraop_image, CHECKBOXES_MODEL, - tile_size, - tile_size, + ckbx_tile_size, + ckbx_tile_size, MODEL_CONFIG["checkboxes"]["horz_overlap_proportion"], MODEL_CONFIG["checkboxes"]["vert_overlap_proportion"], nms_threshold=0.8, @@ -434,7 +434,8 @@ def assign_meaning_to_intraoperative_detections( # get legend locations legend_locations: Dict[str, Tuple[float, float]] = find_legend( intraop_detections_dict["legend"], - **image_size, + image_size[0], + image_size[1], ) # extract inhaled volatile drugs From a67d3cf37f82bfbd70de1625dab67adc90fc4e08 Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Fri, 15 Aug 2025 12:42:57 -0400 Subject: [PATCH 10/13] Change Detection's from_dict method's type hints to clarify that annotation type is not an instance of BoundingBox or Keypoint, but is instead the types BoundingBox or Keypoint. --- src/ChartExtractor/utilities/detections.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ChartExtractor/utilities/detections.py b/src/ChartExtractor/utilities/detections.py index 63a807f..2275949 100644 --- a/src/ChartExtractor/utilities/detections.py +++ b/src/ChartExtractor/utilities/detections.py @@ -31,7 +31,10 @@ class Detection: confidence: float @staticmethod - def from_dict(detection_dict: Dict[str, Any], annotation_type: Union[BoundingBox, Keypoint]): + def from_dict( + detection_dict: Dict[str, Any], + annotation_type: Union[type[BoundingBox], type[Keypoint]] + ) -> "Detection": """Creates a `Detection` from a dictionary of data. Args: From ead41ffd17964669e97f3bdab874ad7395a290fd Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Thu, 28 Aug 2025 15:27:45 -0400 Subject: [PATCH 11/13] Fix issue where preop postop timing was overwriting intraop timing. --- src/ChartExtractor/extraction/extraction.py | 2 +- .../extraction/preoperative_postoperative_digit_boxes.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index ae62d76..1673233 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -424,7 +424,7 @@ def assign_meaning_to_intraoperative_detections( extracted_data["codes"] = extract_drug_codes( corrected_detections_dict["numbers"], *image_size ) - extracted_data["timing"] = extract_surgical_timing( + extracted_data["intraoperative_timing"] = extract_surgical_timing( corrected_detections_dict["numbers"], *image_size ) extracted_data["ett_size"] = extract_ett_size( diff --git a/src/ChartExtractor/extraction/preoperative_postoperative_digit_boxes.py b/src/ChartExtractor/extraction/preoperative_postoperative_digit_boxes.py index 603cf3c..7de6390 100644 --- a/src/ChartExtractor/extraction/preoperative_postoperative_digit_boxes.py +++ b/src/ChartExtractor/extraction/preoperative_postoperative_digit_boxes.py @@ -384,7 +384,11 @@ def extract_preop_postop_digit_data( A dictionary with all the preoperative and postoperative data. """ data: Dict[str, str] = dict() - data["timing"] = extract_time_of_assessment(number_detections, im_width, im_height) + data["preoperative_postoperative_timing"] = extract_time_of_assessment( + number_detections, + im_width, + im_height + ) data["age"] = extract_age(number_detections, im_width, im_height) data["height"] = extract_height(number_detections, im_width, im_height) data["weight"] = extract_weight(number_detections, im_width, im_height) From e8f6f4454469fd7de0602e2be200cd23e88e85aa Mon Sep 17 00:00:00 2001 From: Ryan Folks Date: Thu, 28 Aug 2025 15:31:42 -0400 Subject: [PATCH 12/13] Fix same issue in different function. --- src/ChartExtractor/extraction/extraction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ChartExtractor/extraction/extraction.py b/src/ChartExtractor/extraction/extraction.py index 1673233..f3fbd7a 100644 --- a/src/ChartExtractor/extraction/extraction.py +++ b/src/ChartExtractor/extraction/extraction.py @@ -579,7 +579,7 @@ def digitize_intraop_record(image: Image.Image) -> Dict: # extract drug code and surgical timing codes: Dict = {"codes": extract_drug_codes(digit_detections, *image.size)} - times: Dict = {"timing": extract_surgical_timing(digit_detections, *image.size)} + times: Dict = {"intraoperative_timing": extract_surgical_timing(digit_detections, *image.size)} ett_size: Dict = {"ett_size": extract_ett_size(digit_detections, *image.size)} # get legend locations From 9cc61e65da1ab456cc7f7e36b7caf8c17e6b8a05 Mon Sep 17 00:00:00 2001 From: RyanDoesMath Date: Fri, 12 Sep 2025 14:15:00 -0400 Subject: [PATCH 13/13] remove leading zeros if someone writes them in the timing section. --- src/ChartExtractor/extraction/intraoperative_digit_boxes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ChartExtractor/extraction/intraoperative_digit_boxes.py b/src/ChartExtractor/extraction/intraoperative_digit_boxes.py index 45374ba..f0ccef1 100644 --- a/src/ChartExtractor/extraction/intraoperative_digit_boxes.py +++ b/src/ChartExtractor/extraction/intraoperative_digit_boxes.py @@ -143,7 +143,7 @@ def extract_surgical_timing( tens_place_val: Optional[int] = surgical_timing_values.get(prefix + "_tens") ones_place_val: Optional[int] = surgical_timing_values.get(prefix + "_ones") if None not in [tens_place_val, ones_place_val]: - surgical_timing[prefix] = str(tens_place_val.category) + str( - ones_place_val.category + surgical_timing[prefix] = str( + int(str(tens_place_val.category) + str(ones_place_val.category)) ) return surgical_timing