Source code for AERzip.functions

import os
import time

import numpy as np

from AERzip.CompressedFileHeader import CompressedFileHeader


[docs] def saveCompressedFile(addresses, timestamps, file_path, max_address=None, max_timestamp=None, min_timestamp=None, ask=False, overwrite=False, verbose=False): """ This function compresses and saves a compressed file from the provided addresses and timestamps. :param list addresses: A list containing the addresses of the spikes to be stored. :param list timestamps: A list containing the timestamps of the spikes to be stored :param string file_path: A string indicating where the file should be written. :param int max_address: An integer indicating the maximum address of the addresses list. Not required, but can speed up the compression. :param int max_timestamp: An integer indicating the maximum time stamp of the timestamps list. Not required, but can speed up the compression. :param int min_timestamp: An integer indicating the minimum time stamp of the timestamps list. Not required, but can speed up the compression. :param boolean ask: A boolean indicating whether or not to prompt the user to overwrite a file that has been found at the specified path. :param boolean overwrite: A boolean indicating wheter or not a file that has been found at the specified path must be or not be overwritten (if the user is not asked). :param boolean verbose: A boolean indicating whether or not to print information about the process. """ if verbose: start_time = time.time() print("Compressing and storing...") # --- COMPRESS DATA --- # Checking expected sizes n_addresses = len(addresses) n_timestamps = len(timestamps) if n_addresses != n_timestamps: raise ValueError("The size of the address list must be equal to the size of the time stamp list.") # Converting addresses and time stamps numpy arrays new_addresses = np.array(addresses, copy=False) new_timestamps = np.array(timestamps, copy=False) # Method 1) Reducing time stamps if min_timestamp is None: min_timestamp = np.min(timestamps) new_timestamps = new_timestamps - min_timestamp # Method 2) Grouping time stamps by address spikes = {} for address, timestamp_list in zip(new_addresses, new_timestamps): # Note: spikes dictionary could be not sorted spikes.setdefault(address.item(), []).append(timestamp_list.item()) # --- STORE DATA --- # Check the destination folder dst_path = file_path if not os.path.exists(os.path.dirname(file_path)): os.makedirs(os.path.dirname(file_path)) else: # If the destination folder exists, check if the file exists if os.path.exists(dst_path): if ask: print("A file already exists in the specified path.\n" "Do you want to overwrite it? Y/N\n") option = input() while option != "Y" and option != "N": print("Unexpected value. Please, enter 'Y' (overwrite) or 'N' (no overwrite)\n") option = input() else: if overwrite: option = "Y" else: option = "N" if option == "N": split_path = os.path.splitext(dst_path) i = 1 while os.path.exists(dst_path): dst_path = split_path[0] + "_" + str(i) + split_path[1] i += 1 # Calculating bytes required for addresses and timestamps if max_address is None: max_address = np.max(new_addresses) if max_address <= 0xFF: address_bytes = 1 elif max_address <= 0xFFFF: address_bytes = 2 elif max_address <= 0xFFFFFF: address_bytes = 3 elif max_address <= 0xFFFFFFFF: address_bytes = 4 elif max_address <= 0xFFFFFFFFFF: address_bytes = 5 elif max_address <= 0xFFFFFFFFFFFF: address_bytes = 6 elif max_address <= 0xFFFFFFFFFFFFFF: address_bytes = 7 else: address_bytes = 8 if max_timestamp is None: max_timestamp = np.max(new_timestamps) if max_timestamp <= 0xFF: timestamp_bytes = 1 elif max_timestamp <= 0xFFFF: timestamp_bytes = 2 elif max_timestamp <= 0xFFFFFF: timestamp_bytes = 3 elif max_timestamp <= 0xFFFFFFFF: timestamp_bytes = 4 elif max_timestamp <= 0xFFFFFFFFFF: timestamp_bytes = 5 elif max_timestamp <= 0xFFFFFFFFFFFF: timestamp_bytes = 6 elif max_timestamp <= 0xFFFFFFFFFFFFFF: timestamp_bytes = 7 else: timestamp_bytes = 8 # Save the file file = open(dst_path, "wb") header = CompressedFileHeader(address_size=address_bytes, timestamp_size=timestamp_bytes) file.write(header.toBytes()) # Write pairs key-value delimiter = pow(2, 8 * timestamp_bytes) - 1 # Maximum value for time stamps for address, timestamp_list in spikes.items(): # Write the address file.write(address.to_bytes(address_bytes, byteorder="little", signed=False)) # Write the time stamps for timestamp in timestamp_list: file.write(timestamp.to_bytes(timestamp_bytes, byteorder="little", signed=False)) # Write the delimiter # Note: Using timestamo_bytes bytes to represent the delimiter in order to simplify the reading process file.write(delimiter.to_bytes(timestamp_bytes, byteorder="little", signed=False)) file.close() if verbose: end_time = time.time() print("Compressed file saved (took " + '{0:.3f}'.format(end_time - start_time) + " seconds)")
[docs] def loadCompressedFile(file_path, verbose=False): """ This function loads a compressed file and returns the addresses and time stamps stored in it. :param string file_path: A string indicating the path of the compressed file to be loaded. :param boolean verbose: A boolean indicating whether or not to print information about the process. :return: A tuple containing two lists: the addresses and the time stamps stored in the compressed file. :rtype: tuple """ if verbose: start_time = time.time() print("Loading the compressed file...") # Load the compressed file file = open(file_path, "rb") # Read header header = CompressedFileHeader() header.library_version = file.read(header.library_version_bytes).decode("utf-8").strip() header.compressor = file.read(header.compressor_bytes).decode("utf-8").strip() header.address_size = int.from_bytes(file.read(header.address_size_bytes), "little") header.timestamp_size = int.from_bytes(file.read(header.timestamp_size_bytes), "little") header.optional = file.read(header.optional_bytes) header.header_end = file.read(header.header_end_bytes).decode("utf-8").strip() # Read pairs key-value addresses = np.array([]) timestamps = np.array([]) delimiter = pow(2, 8 * header.timestamp_size) - 1 # Maximum value for time stamps # Read addresses and time stamps addresses = [] timestamps = [] while True: address_data = file.read(header.address_size) if not address_data: break if len(address_data) != header.address_size: raise EOFError("Unexpected end of file when reading address data.") address = int.from_bytes(address_data, byteorder="little", signed=False) # Read time stamps until the delimiter is found while True: timestamp_data = file.read(header.timestamp_size) if not timestamp_data or len(timestamp_data) != header.timestamp_size: raise EOFError("Unexpected end of file when reading timestamp data.") timestamp = int.from_bytes(timestamp_data, byteorder="little", signed=False) if timestamp == delimiter: break addresses.append(address) timestamps.append(timestamp) addresses = np.array(addresses, dtype=np.uint64) timestamps = np.array(timestamps, dtype=np.uint64) file.close() if verbose: end_time = time.time() print("Compressed file loaded (took " + '{0:.3f}'.format(end_time - start_time) + " seconds)") return addresses, timestamps