import numpy
import astropy.io.fits
import scipy
import skimage
import pandas
import os
import glob
from numba import jit
import matplotlib.pyplot
import matplotlib.animation
import tqdm
import multiprocessing
import time
from typing import Union
from pathos.multiprocessing import ProcessingPool
[docs]
class color:
PURPLE = '\033[95m'
CYAN = '\033[96m'
DARKCYAN = '\033[36m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
[docs]
def track_all(datapath: str, cores: int, min_distance: int, l_thr: float, h_thr: float, min_size: int, dx: float, dt: float, sign: str, separation: bool, verbose:bool=False, doppler:bool =False) -> None:
"""
Executes a pipeline for feature detection, identification, association, tabulation, and data storage based on astronomical FITS files.
Parameters:
- datapath (str): Path to the main data directory.
- cores (int): Number of CPU cores to utilize for parallel processing.
- min_distance (int): Minimum distance between features for detection.
- l_thr (int): Threshold value for feature detection.
- min_size (int): Minimum size threshold for identified features.
- dx (float): Pixel size in the x-direction (spatial resolution) for velocity computation.
- dt (float): Time interval between frames (temporal resolution) for velocity computation.
- sign (str): Sign convention for feature detection ('positive', 'negative', or 'both').
- separation (int): Separation threshold for feature detection.
- verbose (bool, optional): If True, displays detailed progress information. Default is False.
- doppler (bool, optional): If True, includes Doppler files for tabulation. Default is False.
Returns:
- None: Outputs are saved as FITS files and a JSON file containing tabulated data.
Raises:
- FileNotFoundError: If there are issues with file paths or missing directories.
Notes:
- This function coordinates the detection, identification, association, tabulation, and storage of astronomical features
across multiple FITS files in the specified 'datapath'.
- The process involves multiple subprocesses, including cleaning up temporary files, feature detection, ID assignment,
association of features across frames, tabulation of feature properties (such as position, area, and flux), and
saving the resulting tabulated data as a JSON file.
PSA: This docstring has been written with the aid of AI.
"""
# Load the data
data = sorted(glob.glob(datapath+"00-data/*.fits"))
# Set the number of workers
number_of_workers = numpy.min([len(data), cores])
# Ensure at least one worker
if number_of_workers < 1:
number_of_workers = 1
print(color.RED + color.BOLD + f"Number of cores used: {number_of_workers}" + color.END)
start = time.time()
# Clean up
print(color.RED + color.BOLD + "Cleaning up" + color.END)
housekeeping(datapath)
# Start the detection and identification
print(color.RED + color.BOLD + "Detecting features..." + color.END)
with multiprocessing.Pool(number_of_workers) as p:
p.starmap(process_image, [(datapath, img, l_thr, h_thr, min_distance, sign, separation, min_size, verbose) for img in data])
# Assign unique IDs
print(color.RED + color.BOLD + "Assigning unique IDs..." + color.END)
id_data = sorted(glob.glob(datapath+"02-id/*.fits"))
unique_id(id_data, datapath, verbose)
print(color.GREEN + color.BOLD + "Feature detection step ended" + color.END)
print(color.RED + color.BOLD + "Associating features..." + color.END)
# Associate the detections
associate(datapath, verbose, number_of_workers)
# delet all temp folders regardless of the files inside
print(color.RED + color.BOLD + "Cleaning up" + color.END)
os.system(f"rm -rf {datapath}temp*")
# Start the tabulation
print(color.RED + color.BOLD + "Starting tabulation" + color.END)
asc_files = sorted(glob.glob(os.path.join(datapath,"03-assoc/*.fits")))
src_files = sorted(glob.glob(os.path.join(datapath+"00-data/*.fits")))
if len(asc_files) == 0 or len(src_files) == 0:
print(color.RED + color.BOLD + "No association or source files found for tabulation." + color.END)
return
if doppler:
doppler_files = sorted(glob.glob(os.path.join(datapath+"00b-doppler/*.fits")))
# Give an error if the path is not found
# This feature is not yet tested, if any error occurs, please open an issue on github.
if len(doppler_files) == 0:
raise FileNotFoundError("No Doppler files found")
df = tabulation_parallel_doppler(asc_files, doppler_files, src_files, dx, dt, cores)
else:
df = tabulation_parallel(asc_files, src_files, dx, dt, cores)
# Save the dataframe
df.to_json(os.path.join(datapath+"dataframe.json"))
end = time.time()
print(color.GREEN + color.BOLD + "Dataframe saved" + color.END)
print(color.YELLOW + color.BOLD + f"Number of elements tracked: {len(df)}" + color.END)
print(color.PURPLE + color.BOLD + f"Time elapsed: {end - start} seconds" + color.END)
###################################
##### HELPER FUNCTIONS ############
###################################
[docs]
def housekeeping(datapath: str) -> None:
"""
Ensures the existence and proper state of specific directories and their contents within a given data path.
This function performs the following tasks:
1. Checks if the directories "01-mask", "02-id", and "03-assoc" exist within the specified datapath.
If none of these directories exist, it creates them.
2. If the directories exist, it checks for files within them.
- If all three directories contain the same number of files and are not empty, it prompts a warning
message indicating that the directories are not empty and proceeds to delete all files in these directories.
- If the number of files in "01-mask" and "02-id" do not match, it deletes all files in these two directories.
- If the directories are empty, it prints a message indicating so.
Parameters:
datapath (str): The path where the directories "01-mask", "02-id", and "03-assoc" are located or should be created.
Note:
This function assumes the existence of a `color` class with BOLD, RED, GREEN, and END attributes for formatting output.
Example:
housekeeping("/path/to/root/data/folder/")
PSA: This docstring has been written with the assistance of AI.
"""
if not os.path.exists(datapath+"01-mask") and not os.path.exists(datapath+"02-id") and not os.path.exists(datapath+"03-assoc"):
os.makedirs(datapath+"01-mask")
os.makedirs(datapath+"02-id")
os.makedirs(datapath+"03-assoc")
else:
files_mask = glob.glob(datapath+"01-mask/*")
files_id = glob.glob(datapath+"02-id/*")
files_assoc = glob.glob(datapath+"03-assoc/*")
if len(files_mask) == len(files_id) == len(files_assoc) != 0:
print(color.BOLD + color.RED + "WARNING: The directories are not empy. Deleting files..." + color.END)
response = "y"
if response == "y":
for file in files_mask:
os.remove(file)
for file in files_id:
os.remove(file)
for file in files_assoc:
os.remove(file)
print(color.BOLD + color.GREEN + "Files deleted successfully." + color.END)
else:
pass
print(color.BOLD + color.GREEN + "Files not deleted." + color.END)
elif len(files_mask) != len(files_id):
print("The number of files in the directories 01-mask and 02-id do not match. Deleting all files.")
for file in files_mask:
os.remove(file)
for file in files_id:
os.remove(file)
else:
pass
print("The directories are empty.")
[docs]
def peak_local_max(img, min_dist, h_thr, sign):
"""
Detect local maxima (or minima) in an image while enforcing a minimum
distance between detected peaks.
Parameters
----------
img : ndarray
Input image (2D).
min_dist : int, optional
Minimum allowed Euclidean distance between detected peaks. Default is 5.
h_thr : float, optional
Threshold value for preprocessing peaks. Default is 0.5.
sign : {"pos", "neg"}
Whether to detect positive ("pos") or negative ("neg") peaks.
Returns
-------
centroids : ndarray of shape (N, 2)
Array of (row, col) centroid coordinates of detected peaks.
"""
# Preprocess
if sign == "neg":
img_proc = img_pre_neg(img, h_thr)
elif sign == "pos":
img_proc = img_pre_pos(img, h_thr)
else:
raise ValueError('`sign` must be either "neg" or "pos".')
# Label connected components on binary mask
mask = img_proc > 0
labels = skimage.measure.label(mask)
props = skimage.measure.regionprops_table(labels, properties=["centroid"])
centroids = numpy.column_stack((props["centroid-0"], props["centroid-1"]))
if centroids.size == 0:
raise RuntimeWarning("No centroids found — try lowering the threshold.")
# Compute pairwise distances between centroids
diff = centroids[:, None, :] - centroids[None, :, :]
dist_matrix = numpy.sqrt(numpy.sum(diff**2, axis=-1))
# Find pairs of centroids closer than min_dist
triu_indices = numpy.triu_indices(len(centroids), k=1)
close_pairs = numpy.where(dist_matrix[triu_indices] < min_dist)[0]
# Decide which centroid to remove in each close pair
to_remove = set()
for idx in close_pairs:
i, j = triu_indices[0][idx], triu_indices[1][idx]
val_i = img[int(round(centroids[i][0])), int(round(centroids[i][1]))]
val_j = img[int(round(centroids[j][0])), int(round(centroids[j][1]))]
to_remove.add(i if val_i < val_j else j)
# Remove centroids that are too close to stronger neighbors
if to_remove:
centroids = numpy.delete(centroids, list(to_remove), axis=0)
return centroids
[docs]
def img_pre_pos(img: numpy.ndarray, thr: float) -> numpy.ndarray:
img_pos = img.copy()
img_pos[img_pos < 0] = 0
img_pos = numpy.array(img_pos, dtype=numpy.float64)
img_pos[img_pos < thr] = 0
return img_pos
[docs]
def img_pre_neg(img: numpy.ndarray, l_thr: float) -> numpy.ndarray:
img_neg = img.copy()
img_neg = -1*numpy.array(img_neg, dtype=numpy.float64)
img_neg[img_neg < 0] = 0
img_neg[img_neg < l_thr] = 0
return img_neg
[docs]
def watershed_routine(img: numpy.ndarray, l_thr:float, h_thr:float, min_dist: int, sign: str, separation:bool = False) -> tuple[numpy.ndarray, numpy.ndarray]:
if sign == "neg":
img_low = img_pre_neg(img, l_thr)
elif sign == "pos":
img_low = img_pre_pos(img, l_thr)
else:
raise ValueError('sign must be "neg" or "pos"')
distance = scipy.ndimage.distance_transform_edt(img_low)
coords = peak_local_max(img, min_dist, h_thr, sign)
mask = numpy.zeros(distance.shape, dtype=bool)
mask[tuple(coords.astype(int).T)] = True
markers, _ = scipy.ndimage.label(mask)
labels_line = skimage.segmentation.watershed(-distance, markers, mask=img_low, compactness=10, watershed_line=separation)
return labels_line, coords
###################################
##### MAIN FUNCTIONS ##############
###################################
[docs]
def detection(img: numpy.ndarray, l_thr: float, h_thr: float, min_distance:int,sign:str="both", separation:bool=False, verbose:bool=False) -> Union[tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], tuple[numpy.ndarray, numpy.ndarray], tuple[numpy.ndarray, numpy.ndarray]]:
"""
Detects features in an image using a threshold and watershed algorithm based on the specified sign of features.
This function processes an input image to detect features either of positive values, negative values, or both.
The detection is based on thresholding and the watershed algorithm, which can be applied with or without separation.
Parameters:
img (numpy.ndarray): The input image to process.
l_thr (int): The threshold value for detection.
min_distance (int): The minimum distance between features for the watershed algorithm.
sign (str, optional): Specifies the type of features to detect. Options are "both", "pos", or "neg".
Default is "both".
separation (bool, optional): If True, applies separation in the watershed routine. Default is False.
Returns:
Union[tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], tuple[numpy.ndarray, numpy.ndarray]]:
- If sign is "both": Returns a tuple of three elements:
labels (numpy.ndarray): The combined labels of detected features.
coords_pos (numpy.ndarray): Coordinates of positive features.
coords_neg (numpy.ndarray): Coordinates of negative features.
- If sign is "pos" or "neg": Returns a tuple of two elements:
labels (numpy.ndarray): The labels of detected features.
coords (numpy.ndarray): Coordinates of the detected features.
Raises:
ValueError: If sign is not "both", "pos", or "neg".
Example:
labels, coords_pos, coords_neg = detection(img, l_thr=50, min_distance=10, sign="both", separation=True)
PSA: This docstring has been written with the assistance of AI.
"""
img = numpy.array(img)
if sign == "both":
labels_pos,_ = watershed_routine(img, l_thr, h_thr, min_distance, "pos", separation)
labels_neg,_ = watershed_routine(img, l_thr, h_thr, min_distance, "neg", separation)
labels_neg = -1*labels_neg
labels = labels_pos + labels_neg
if verbose:
print(f"Number of clumps detected: {len(numpy.unique(labels))-1}")
return labels
elif sign == "pos":
labels_pos,_ = watershed_routine(img, l_thr, h_thr, min_distance, "pos", separation)
return labels_pos
elif sign == "neg":
labels_neg,_ = watershed_routine(img, l_thr, h_thr, min_distance, "pos", separation)
return labels_neg
else:
raise ValueError('sign must be "both", "pos" or "neg"')
[docs]
def identification(labels: numpy.ndarray, min_size: int, verbose:bool = False) -> numpy.ndarray:
"""
Identifies and filters clumps in the input label array based on a minimum size threshold.
This function processes the input label array, retaining only those clumps (connected components) that meet
the specified minimum size. Clumps smaller than the minimum size are removed (set to zero).
Parameters:
labels (numpy.ndarray): The input array of labels representing different clumps.
min_size (int): The minimum size (number of pixels) a clump must have to be retained.
Returns:
numpy.ndarray: The filtered label array with only clumps meeting the minimum size retained.
Raises:
ValueError: If no clumps survive the identification process.
Note:
Future versions may include a "verbose" option to print the number of clumps removed.
Example:
filtered_labels = identification(labels, min_size=50)
PSA: This docstring has been written with the assistance of AI.
"""
count = 0
uid = numpy.unique(labels)
original_number = len(uid)
if verbose:
print(f"Number of clumps detected: {original_number-1}")
for k in tqdm.tqdm(uid, leave=False):
sz = numpy.where(labels == k)
if len(sz[0]) < min_size:
labels = numpy.where(labels == k, 0, labels)
count+=1
num = original_number - count
if verbose:
print(f"Number of clumps surviving the identification process: {num}")
if num == 0:
raise ValueError("No clumps survived the identification process")
else:
if verbose:
print(f"Number of clumps surviving the identification process: {num}")
pass
return labels
[docs]
def process_image(datapath: str, data: str, l_thr: float, h_thr: float, min_distance: int, sign:str="both", separation:bool=True, min_size:int=4, verbose:bool=False) -> None:
"""
Processes an astronomical image by detecting and identifying clumps within it, and saves the results.
This function performs the following steps:
1. Reads the input FITS image file.
2. Detects clumps in the image using the specified detection parameters.
3. Saves the detected clumps to the "01-mask" directory.
4. Identifies and filters clumps based on the minimum size.
5. Saves the filtered clumps to the "02-id" directory.
Parameters:
datapath (str): The path where the results will be saved.
data (str): The path to the input FITS image file.
l_thr (int): The threshold value for clump detection.
min_distance (int): The minimum distance between detected clumps.
sign (str, optional): Specifies the type of clumps to detect. Options are "both", "pos", or "neg".
Default is "both".
separation (bool, optional): If True, applies separation in the detection routine. Default is True.
min_size (int, optional): The minimum size (number of pixels) a clump must have to be retained. Default is 4.
Returns:
None
Example:
process_image("/path/to/data/", "image.fits", l_thr=50, min_distance=10, sign="both", separation=True, min_size=4)
PSA: This docstring has been written with the assistance of AI.
"""
image = astropy.io.fits.getdata(data, memmap=False)
labels = detection(image, l_thr, h_thr, min_distance, sign=sign, separation=separation, verbose=verbose)
astropy.io.fits.writeto(datapath+f"01-mask/{data.split(os.sep)[-1]}", labels, overwrite=True)
labels = identification(labels, min_size, verbose=verbose)
astropy.io.fits.writeto(datapath+f"02-id/{data.split(os.sep)[-1]}", labels, overwrite=True)
[docs]
def unique_id(id_data: str, datapath: str, verbose:bool=False) -> None:
"""
Assigns unique IDs to clumps in a list of FITS image files and saves the modified files.
This function processes each FITS file in the provided list, replacing each unique non-zero clump identifier
with a globally unique ID. The modified images are saved in the "02-id" directory within the specified datapath.
Parameters:
id_data (list): A list of paths to the FITS files to be processed.
datapath (str): The path where the modified files will be saved.
Returns:
None
Example:
unique_id(["image1.fits", "image2.fits"], "/path/to/data/")
PSA: This docstring has been written with the assistance of AI.
"""
u_id_p = 1
u_id_n = -1
for filename in tqdm.tqdm(id_data):
img_n0 = astropy.io.fits.getdata(filename, memmap=False)
# Extract unique non-zero values
ids = numpy.unique(img_n0[img_n0 != 0])
ids_p = ids[ids > 0]
ids_n = ids[ids < 0]
# Replace each unique value with its corresponding unique ID
for i in ids_p:
img_n0[img_n0 == i] = u_id_p
u_id_p += 1
for i in ids_n:
img_n0[img_n0 == i] = u_id_n
u_id_n -= 1
# Write the modified data back to the file
astropy.io.fits.writeto(os.path.join(datapath, "02-id", os.path.basename(filename)), img_n0, overwrite=True)
if verbose:
print(f"Total number of unique IDs: {u_id_p+abs(u_id_n)-1}")
[docs]
def array_row_intersection(a: numpy.ndarray,b:numpy.ndarray) -> numpy.ndarray:
"""
Finds the intersection of rows between two 2D numpy arrays.
This function identifies rows that are present in both input arrays `a` and `b` and returns the rows from `a`
that are also in `b`. The function is optimized for performance using numpy operations.
Parameters:
a (numpy.ndarray): A 2D numpy array.
b (numpy.ndarray): A 2D numpy array.
Returns:
numpy.ndarray: A 2D numpy array containing the rows from `a` that are also present in `b`.
Note:
This function is adapted from a solution provided by Vasilis Lemonidis on Stack Overflow.
All credit goes to the original author. Source: https://stackoverflow.com/a/40600991
Example:
a = np.array([[1, 2], [3, 4], [5, 6]])
b = np.array([[3, 4], [7, 8]])
result = array_row_intersection(a, b)
# result will be array([[3, 4]])
PSA: This docstring has been written with the assistance of AI.
"""
tmp=numpy.prod(numpy.swapaxes(a[:,:,None],1,2)==b,axis=2)
return a[numpy.sum(numpy.cumsum(tmp,axis=0)*tmp==1,axis=1).astype(bool)]
[docs]
def back_and_forth_matching_PARALLEL(fname1: str, fname2: str, round: int, datapath: str, verbose:bool=False) -> None:
"""
Performs parallel forward and backward matching of unique IDs between two FITS files.
This function reads two FITS files, performs forward and backward matching of unique IDs
based on the largest intersection of pixel coordinates, and saves the modified second FITS file.
Parameters:
fname1 (str): File path to the first FITS file.
fname2 (str): File path to the second FITS file.
round (int): Round number or identifier for the current processing round.
datapath (str): Directory path where temporary and output files will be saved.
Returns:
None
Note:
This function assumes `array_row_intersection` function is defined elsewhere in your code.
It uses tqdm for progress monitoring and assumes numpy arrays for file operations.
Example:
back_and_forth_matching_PARALLEL('file1.fits', 'file2.fits', 1, '/path/to/data/')
PSA: This docstring has been written with the assistance of AI.
"""
cube1 = astropy.io.fits.getdata(fname1, memmap=False)
cube2 = astropy.io.fits.getdata(fname2, memmap=False)
file1 = cube1[-1] if cube1.ndim > 2 else cube1
file2 = cube2[0] if cube2.ndim > 2 else cube2
unique_id_1 = numpy.unique(file1)
unique_id_1 = unique_id_1[unique_id_1 != 0]
# create two empty 1D arrays to store the forward_1 and forward_2 matches
forward_matches_1 = numpy.empty(0)
forward_matches_2 = numpy.empty(0)
for id_1 in tqdm.tqdm(unique_id_1, leave=False, desc="Forward matching"):
try:
wh1 = numpy.where(file1 == id_1)
set1 = numpy.stack((wh1[0], wh1[1])).T
except:
print(f"Error in forward matching for id_1: {id_1}. Skipping.")
print(f"Frame was {fname1.split(os.sep)[-1]} and round was {round}")
raise ValueError("Error in forward matching. Check the input files.")
max_intersection_size = 0
# create a mask of the element of the first image in the second image
temp_mask = numpy.where(file1 == id_1, 1, 0)
temp_file2 = file2 * temp_mask
unique_id_2 = numpy.unique(temp_file2)
unique_id_2 = unique_id_2[unique_id_2 != 0]
for id_2 in unique_id_2:
wh2 = numpy.where(file2 == id_2)
set2 = numpy.stack((wh2[0], wh2[1])).T
temp_intersection_size = len(array_row_intersection(set1, set2))
if temp_intersection_size > max_intersection_size:
max_intersection_size = temp_intersection_size
best_match_1 = id_1
best_match_2 = id_2
if max_intersection_size != 0:
forward_matches_1 = numpy.append(forward_matches_1, best_match_1)
forward_matches_2 = numpy.append(forward_matches_2, best_match_2)
unique_id_2 = numpy.unique(file2)
unique_id_2 = unique_id_2[unique_id_2 != 0]
backward_matches_1 = numpy.empty(0)
backward_matches_2 = numpy.empty(0)
for id_2 in tqdm.tqdm(unique_id_2, leave=False, desc="Backward matching"):
try:
wh2 = numpy.where(file2 == id_2)
set2 = numpy.stack((wh2[0], wh2[1])).T
except:
print(f"Error in backward matching for id_2: {id_2}. Skipping.")
print(f"Frame was {fname2.split(os.sep)[-1]} and round was {round}")
raise ValueError("Error in backward matching. Check the input files.")
max_intersection_size = 0
# create a mask of the element of the first image in the second image
temp_mask = numpy.where(file2 == id_2, 1, 0)
temp_file1 = file1 * temp_mask
unique_id_1 = numpy.unique(temp_file1)
unique_id_1 = unique_id_1[unique_id_1 != 0]
for id_1 in unique_id_1:
wh1 = numpy.where(file1 == id_1)
set1 = numpy.stack((wh1[0], wh1[1])).T
temp_intersection_size = len(array_row_intersection(set1, set2))
if temp_intersection_size > max_intersection_size:
max_intersection_size = temp_intersection_size
best_match_1 = id_1
best_match_2 = id_2
if max_intersection_size != 0:
backward_matches_1 = numpy.append(backward_matches_1, best_match_1)
backward_matches_2 = numpy.append(backward_matches_2, best_match_2)
# consider only the matches that are mutual
mutual_matches_1 = numpy.empty(0)
mutual_matches_2 = numpy.empty(0)
for kk in tqdm.tqdm(range(len(forward_matches_1)), leave=False, desc="Mutual matching"):
if forward_matches_1[kk] in backward_matches_1 and forward_matches_2[kk] in backward_matches_2:
fwm1 = forward_matches_1[kk]
fwm2 = forward_matches_2[kk]
mutual_matches_1 = numpy.append(mutual_matches_1, fwm1)
mutual_matches_2 = numpy.append(mutual_matches_2, fwm2)
for idx in tqdm.tqdm(range(len(mutual_matches_1)), leave=False, desc="Replacing"):
numpy.place(cube2, cube2 == mutual_matches_2[idx], mutual_matches_1[idx])
# append vertically the frames in cube1 and cube2
if len(numpy.shape(cube1)) == 2:
cube1 = cube1.reshape(1, cube1.shape[0], cube1.shape[1])
if len(numpy.shape(cube2)) == 2:
cube2 = cube2.reshape(1, cube2.shape[0], cube2.shape[1])
cube2 = numpy.concatenate((cube1, cube2), axis=0)
astropy.io.fits.writeto(datapath+f"temp{round}/{fname1.split(os.sep)[-1]}", cube2, overwrite=True)
if verbose:
print(color.YELLOW + f"Done with {fname1.split(os.sep)[-1]}, {fname2.split(os.sep)[-1]}" + color.END)
[docs]
def associate(datapath: str, verbose:bool=False, number_of_workers:int=None) -> None:
"""
Perform association of FITS files using parallel processing.
This function processes FITS files in `datapath/02-id` directory, performing association
using the `back_and_forth_matching_PARALLEL` function. It divides the data into subgroups,
processes each subgroup in parallel using multiprocessing, and saves the associated results
in `datapath/03-assoc` directory.
Parameters:
datapath (str): The base directory path containing the data.
verbose (bool): set to true to print additional informations.
number_of_workers (int): number of workers for the parallel work
Returns:
None
Note:
This function assumes the presence of necessary directories (`02-id` and `03-assoc`) and
uses multiprocessing for parallelization. It also assumes the availability of `back_and_forth_matching_PARALLEL`
function and `color` for colored console output.
Example:
associate("/path/to/data/")
PSA: This docstring has been written with the assistance of AI.
"""
id_data = sorted(glob.glob(datapath+"02-id/*.fits"))
round = 0
# make a folder named temp to store the results of the rounds of association
os.makedirs(datapath+"temp0", exist_ok=True)
# divide id_data in subgroups of 2 frames, in case of odd number of frames, the last frame is kept alone
subgroups = [id_data[i:i+2] for i in range(0, len(id_data), 2)]
if len(subgroups) > 0 and len(subgroups[-1]) == 1:
img = astropy.io.fits.getdata(subgroups[-1][0], memmap=False)
img = img.reshape(1, img.shape[0], img.shape[1])
astropy.io.fits.writeto(datapath+f"temp{round}/{subgroups[-1][0].split(os.sep)[-1]}", img, overwrite=True)
subgroups = subgroups[:-1]
# parallelize the association by using the multiprocessing module on each subgroup
print(color.RED + color.BOLD + "Starting the first round of association" + color.END)
args = [(subgroup[0], subgroup[1], round, datapath, verbose) for subgroup in subgroups]
pool = multiprocessing.Pool(processes=number_of_workers)
results = pool.starmap(back_and_forth_matching_PARALLEL, args)
pool.close()
pool.join()
max_iter = 10
# subsequent rounds of association
for round in range(1, max_iter):
# load the data
data = sorted(glob.glob(datapath+f"temp{round-1}/*.fits"))
os.makedirs(datapath+f"temp{round}", exist_ok=True)
if len(data) < 2:
break
# divide id_data in subgroups of 2 frames, in case of odd number of frames, the last frame is kept alone
subgroups = [data[i:i+2] for i in range(0, len(data), 2)]
if len(subgroups) > 0 and len(subgroups[-1]) == 1:
img = astropy.io.fits.getdata(subgroups[-1][0], memmap=False)
img = img.reshape(img.shape[0], img.shape[1], img.shape[2])
astropy.io.fits.writeto(datapath+f"temp{round}/{subgroups[-1][0].split(os.sep)[-1]}", img, overwrite=True)
subgroups = subgroups[:-1]
print(color.RED + color.BOLD + f"Starting the {round+1} round of association" + color.END)
args = [(subgroup[0], subgroup[1], round, datapath, verbose) for subgroup in subgroups]
pool = multiprocessing.Pool(processes=number_of_workers)
results = pool.starmap(back_and_forth_matching_PARALLEL, args)
pool.close()
pool.join()
# Check if there are any results to process
final_files = sorted(glob.glob(datapath+f"temp{round-1}/*.fits"))
if not final_files:
print(color.RED + color.BOLD + "No associated files found." + color.END)
return
data = astropy.io.fits.getdata(final_files[0], memmap=False)
# export each frame of the data cube as a fits file in 03_assoc
for i in range(data.shape[0]):
astropy.io.fits.writeto(datapath+f"03-assoc/{i:04d}.fits", data[i, :, :], overwrite=True)
print(color.GREEN + color.BOLD + "Finished association" + color.END)
[docs]
def tabulation_parallel(files: list, filesB: list, dx: float, dt: float, cores: int, minliftime: int = 4) -> pandas.DataFrame:
"""
Process segmentation maps (files) and source FITS files (filesB) in parallel.
Extracts blob properties (centroid, area, flux, eccentricity), tracks them across frames,
and computes velocities.
Parameters
----------
files : list
List of segmentation FITS files (each pixel labeled by blob ID).
filesB : list
List of source FITS files with intensity values.
dx : float
Spatial resolution (e.g. arcsec/pixel).
dt : float
Temporal resolution (frame cadence).
cores : int
Number of CPU cores for parallel processing.
minliftime : int, optional
Minimum number of frames a blob must persist to be included (default=4).
Returns
-------
pandas.DataFrame
Summary table with one row per tracked blob:
label, Lifetime, arrays of X, Y, Area, Flux, Frames, ecc, Vx, Vy, stdVx, stdVy.
"""
# Build coordinate grids once
img = astropy.io.fits.getdata(filesB[0], memmap=False)
size = numpy.shape(img)
x_1, y_1 = numpy.meshgrid(numpy.arange(size[1]), numpy.arange(size[0]))
def process_file(j):
"""Process a single frame and return DataFrame of blob properties."""
src_img = astropy.io.fits.getdata(filesB[j], memmap=False)
asc_img = astropy.io.fits.getdata(files[j], memmap=False)
unique_ids = numpy.unique(asc_img)
records = []
for i in tqdm.tqdm(unique_ids, leave=False, desc=f"Frame {j}"):
if i == 0: # skip background
continue
mask = (asc_img == i)
Bm = src_img * mask
Area = mask.sum()
if Area == 0 or Bm.sum() == 0:
continue
Flux = Bm.sum() / Area
X = ((mask * x_1) * Bm).sum() / Bm.sum()
Y = ((mask * y_1) * Bm).sum() / Bm.sum()
r = numpy.sqrt(Area / numpy.pi)
circle = ((x_1 - X)**2 + (y_1 - Y)**2 < r**2).astype(int) * mask
Area_circle = circle.sum()
ecc = Area_circle / Area if Area > 0 else numpy.nan
records.append({
"label": i, "X": X, "Y": Y,
"Area": Area, "Flux": Flux,
"frame": j, "ecc": ecc
})
return pandas.DataFrame.from_records(records)
# Parallel execution
with ProcessingPool(cores) as p:
results = list(p.imap(process_file, range(len(files))))
df = pandas.concat(results, ignore_index=True)
# Merge by label
groups = df.groupby("label")
area_tot, flux_tot, X_tot, Y_tot = [], [], [], []
label_tot, frame_tot, ecc_tot = [], [], []
for name, group in groups:
area_temp = group["Area"].values
flux_temp = group["Flux"].values
X_temp = group["X"].values
Y_temp = group["Y"].values
label_temp = group["label"].values
frame_temp = group["frame"].values
ecc_temp = group["ecc"].values
# Sanity checks
if not (len(area_temp) == len(flux_temp) == len(X_temp) ==
len(Y_temp) == len(label_temp) == len(frame_temp)):
raise ValueError(f"Inconsistent lengths in group {name}")
if len(numpy.unique(label_temp)) > 1:
raise ValueError(f"More than one label in group {name}")
if numpy.any(numpy.diff(frame_temp) != 1):
raise ValueError(f"Frames are not consecutive for label {name}")
area_tot.append(area_temp)
flux_tot.append(flux_temp)
X_tot.append(X_temp)
Y_tot.append(Y_temp)
label_tot.append(label_temp)
frame_tot.append(frame_temp)
ecc_tot.append(ecc_temp)
# Final DataFrame
df_final = pandas.DataFrame({
"label": [x[0] for x in label_tot],
"Lifetime": [len(x) for x in frame_tot],
"X": X_tot, "Y": Y_tot,
"Area": area_tot, "Flux": flux_tot,
"Frames": frame_tot, "ecc": ecc_tot
})
# Apply lifetime filter
df_final = df_final[df_final["Lifetime"] >= minliftime]
# Compute velocities
vxtot, vytot, stdvxtot, stdvytot = [], [], [], []
for j in range(len(df_final)):
x = numpy.array(df_final["X"].iloc[j])
y = numpy.array(df_final["Y"].iloc[j])
vx = numpy.gradient(x) * dx / dt
vy = numpy.gradient(y) * dx / dt
vxtot.append(vx)
vytot.append(vy)
stdvxtot.append(numpy.std(vx))
stdvytot.append(numpy.std(vy))
df_final["Vx"] = vxtot
df_final["Vy"] = vytot
df_final["stdVx"] = stdvxtot
df_final["stdVy"] = stdvytot
df_final = df_final.reset_index(drop=True)
return df_final
[docs]
def tabulation_parallel_doppler(files: str, filesD: str, filesB: str, dx: float, dt: float, cores: int, minliftime: int = 4) -> pandas.DataFrame:
def process_file(j):
file = files[j]
src_img = astropy.io.fits.getdata(filesB[j], memmap=False)
asc_img = astropy.io.fits.getdata(file, memmap=False)
alt_img = astropy.io.fits.getdata(filesD[j], memmap=False)
unique_ids = numpy.unique(asc_img)
df_temp = pandas.DataFrame(columns=["label", "X", "Y", "Area", "Flux", "LOS_V", "frame", "ecc"])
for i in unique_ids:
if i == 0:
continue
mask = (asc_img == i)
Bm = src_img * mask
LosV_s = alt_img * mask
LosV_s[LosV_s == 0] = numpy.nan
Area = mask.sum()
Flux = Bm.sum() / Area
LosV = LosV_s # numpy.nanmean(LosV_s)
X = ((mask * x_1) * Bm).sum() / Bm.sum()
Y = ((mask * y_1) * Bm).sum() / Bm.sum()
r = numpy.sqrt(Area / numpy.pi)
circle = (x_1 - X)**2 + (y_1 - Y)**2 < r**2
circle = circle.astype(int)
circle = circle * mask
Area_circle = circle.sum()
ecc = Area_circle / Area
temp = pandas.DataFrame([[i, X, Y, Area, Flux, LosV, j, ecc]], columns=["label", "X", "Y", "Area", "Flux", "LOS_V", "frame", "ecc"])
df_temp = pandas.concat([df_temp, temp], ignore_index=False)
return df_temp
df = pandas.DataFrame(columns=["label", "X", "Y", "Area", "Flux", "LOS_V", "frame", "ecc"])
img = astropy.io.fits.getdata(filesB[0], memmap=False)
size = numpy.shape(img)
x_1, y_1 = numpy.meshgrid(numpy.arange(size[1]), numpy.arange(size[0]))
with ProcessingPool(cores) as p:
results = list(p.imap(process_file, range(len(files))))
for result in results:
df = pandas.concat([df, result], ignore_index=False)
# Merge the common labels
groups = df.groupby("label")
area_tot = []
flux_tot = []
losv_tot = []
X_tot = []
Y_tot = []
label_tot = []
frame_tot = []
ecc_tot = []
for name, group in tqdm(groups, desc="Merging common labels"):
area_temp = group["Area"].values
flux_temp = group["Flux"].values
losv_temp = group["LOS_V"].values
X_temp = group["X"].values
Y_temp = group["Y"].values
label_temp = group["label"].values
frame_temp = group["frame"].values
ecc_temp = group["ecc"].values
# Perform some sanity checks
if len(area_temp) != len(flux_temp):
raise ValueError("area and flux have different lengths")
if len(area_temp) != len(losv_temp):
raise ValueError("area and losv have different lengths")
if len(area_temp) != len(X_temp):
raise ValueError("area and X have different lengths")
if len(area_temp) != len(Y_temp):
raise ValueError("area and Y have different lengths")
if len(area_temp) != len(label_temp):
raise ValueError("area and label have different lengths")
if len(area_temp) != len(frame_temp):
raise ValueError("area and frame have different lengths")
if len(numpy.unique(label_temp)) > 1:
raise ValueError("More than one label in the group")
if numpy.any(numpy.diff(frame_temp) != 1):
raise ValueError("Frames are not consecutive")
area_tot.append(area_temp)
flux_tot.append(flux_temp)
losv_tot.append(losv_temp)
X_tot.append(X_temp)
Y_tot.append(Y_temp)
label_tot.append(label_temp)
frame_tot.append(frame_temp)
ecc_tot.append(ecc_temp)
df_final = pandas.DataFrame(columns=["label", "Lifetime", "X", "Y", "Area", "Flux", "LOS_V", "Frames", "ecc"])
df_final["label"] = [x[0] for x in label_tot]
df_final["Lifetime"] = [len(x) for x in frame_tot]
df_final["X"] = X_tot
df_final["Y"] = Y_tot
df_final["Area"] = area_tot
df_final["Flux"] = flux_tot
df_final["LOS_V"] = losv_tot
df_final["Frames"] = frame_tot
df_final["ecc"] = ecc_tot
df_final = df_final[df_final["Lifetime"] >= minliftime]
# Compute the velocities
vxtot = []
vytot = []
stdvxtot = []
stdvytot = []
for j in tqdm(range(len(df_final)), desc="Computing velocities"):
x = df_final["X"].iloc[j]
y = df_final["Y"].iloc[j]
x = numpy.array(x)
y = numpy.array(y)
vx = numpy.gradient(x) * dx / dt
vy = numpy.gradient(y) * dx / dt
stdx = numpy.std(vx)
stdy = numpy.std(vy)
vxtot.append(vx)
vytot.append(vy)
stdvxtot.append(stdx)
stdvytot.append(stdy)
df_final["Vx"] = vxtot
df_final["Vy"] = vytot
df_final["stdVx"] = stdvxtot
df_final["stdVy"] = stdvytot
df_final = df_final.reset_index(drop=True)
return df_final
###################################
###### DEPRECATED FUNCTIONS #######
###################################
[docs]
def tabulation(files: str, filesB: str,dx: float, dt: float, cores: int, minliftime:int=4) -> pandas.DataFrame:
"""
Analyzes a series of FITS files and their corresponding background files to extract
properties of labeled regions across frames, merge common labels, compute lifetime,
and calculate velocities.
Parameters:
- files (list of str): List of paths to FITS files containing labeled regions.
- filesB (list of str): List of paths to background FITS files corresponding to 'files'.
- dx (float): Pixel size in the x-direction (spatial resolution).
- dt (float): Time interval between frames (temporal resolution).
- minliftime (int, optional): Minimum lifetime (number of frames a label persists) to consider. Default is 4.
Returns:
- pd.DataFrame: DataFrame containing tabulated data with columns:
['label', 'Lifetime', 'X', 'Y', 'Area', 'Flux', 'Frames', 'Vx', 'Vy', 'stdVx', 'stdVy'].
Raises:
- ValueError: If there are inconsistencies in the data, such as non-consecutive frames or multiple labels in a group.
Notes:
- The function assumes that 'files' and 'filesB' are aligned, i.e., each entry in 'files' corresponds to the
same index in 'filesB' for background data.
- 'dx' and 'dt' are used to compute velocities ('Vx', 'Vy') based on numerical differentiation of positions ('X', 'Y').
PSA: This docstring has been written with the aid of AI.
"""
df = pandas.DataFrame(columns=["label", "X", "Y", "Area", "Flux", "frame", "ecc"])
img = astropy.io.fits.getdata(filesB[0], memmap=False)
size=numpy.shape(img)
x_1, y_1 = numpy.meshgrid(numpy.arange(size[1]), numpy.arange(size[0]))
for j,file in tqdm.tqdm(enumerate(files), desc="Tabulation"):
src_img = astropy.io.fits.getdata(filesB[j], memmap=False)
asc_img = astropy.io.fits.getdata(file, memmap=False)
unique_ids = numpy.unique(asc_img)
for i in unique_ids:
if i == 0:
continue
mask=(asc_img == i)
Bm=src_img*mask
Area=mask.sum()
Flux=Bm.sum()/Area
X=((mask*x_1)*Bm).sum()/Bm.sum()
Y=((mask*y_1)*Bm).sum()/Bm.sum()
r = numpy.sqrt(Area / numpy.pi)
circle = (x_1 - X)**2 + (y_1 - Y)**2 < r**2
circle = circle.astype(int)
circle = circle * mask
Area_circle = circle.sum()
ecc = Area_circle / Area
temp = pandas.DataFrame([[i, X, Y, Area, Flux, j, ecc]], columns=["label", "X", "Y", "Area", "Flux", "frame", "ecc"])
df = pandas.concat([df, temp], ignore_index=False)
# Merge the common labels
groups = df.groupby("label")
area_tot = []
flux_tot = []
X_tot = []
Y_tot = []
label_tot = []
frame_tot = []
ecc_tot = []
for name, group in tqdm.tqdm(groups, desc="Merging common labels"):
area_temp = group["Area"].values
flux_temp = group["Flux"].values
X_temp = group["X"].values
Y_temp = group["Y"].values
label_temp = group["label"].values
frame_temp = group["frame"].values
ecc_temp = group["ecc"].values
# Perform some sanity checks
if len(area_temp) != len(flux_temp):
raise ValueError("area and flux have different lengths")
if len(area_temp) != len(X_temp):
raise ValueError("area and X have different lengths")
if len(area_temp) != len(Y_temp):
raise ValueError("area and Y have different lengths")
if len(area_temp) != len(label_temp):
raise ValueError("area and label have different lengths")
if len(area_temp) != len(frame_temp):
raise ValueError("area and frame have different lengths")
if len(numpy.unique(label_temp)) > 1:
raise ValueError("More than one label in the group")
if numpy.any(numpy.diff(frame_temp) != 1):
raise ValueError("Frames are not consecutive")
area_tot.append(area_temp)
flux_tot.append(flux_temp)
X_tot.append(X_temp)
Y_tot.append(Y_temp)
label_tot.append(label_temp)
frame_tot.append(frame_temp)
ecc_tot.append(ecc_temp)
df_final = pandas.DataFrame(columns=["label", "Lifetime", "X", "Y", "Area", "Flux", "Frames", "ecc"])
df_final["label"] = [x[0] for x in label_tot]
df_final["Lifetime"] = [len(x) for x in frame_tot]
df_final["X"] = X_tot
df_final["Y"] = Y_tot
df_final["Area"] = area_tot
df_final["Flux"] = flux_tot
df_final["Frames"] = frame_tot
df_final["ecc"] = ecc_tot
df_final = df_final[df_final["Lifetime"] >= minliftime]
# Compute the velocities
vxtot = []
vytot = []
stdvxtot = []
stdvytot = []
for j in tqdm.tqdm(range(len(df_final)), desc="Computing velocities"):
x = df_final["X"].iloc[j]
y = df_final["Y"].iloc[j]
x = numpy.array(x)
y = numpy.array(y)
vx = numpy.gradient(x)*dx/dt
vy = numpy.gradient(y)*dx/dt
stdx = numpy.std(vx)
stdy = numpy.std(vy)
vxtot.append(vx)
vytot.append(vy)
stdvxtot.append(stdx)
stdvytot.append(stdy)
df_final["Vx"] = vxtot
df_final["Vy"] = vytot
df_final["stdVx"] = stdvxtot
df_final["stdVy"] = stdvytot
df_final = df_final.reset_index(drop=True)
return df_final