"""Chunk distribution for multiprocess. (:mod:`qurry.qurrium.utils.chunk`)This module provides functions to calculate chunk size and distribute tasks"""from...tools.parallelmanagerimportDEFAULT_POOL_SIZE,CPU_COUNT
[docs]defvery_easy_chunk_size(tasks_num:int,num_process:int=DEFAULT_POOL_SIZE,max_chunk_size:int=CPU_COUNT*4,)->int:"""Calculate the chunk size for multiprocess. Args: tasks_num (int): The number of tasks. num_process (int, optional): The number of processes. Defaults to DEFAULT_POOL_SIZE. max_chunk_size (int, optional): The maximum chunk size. Defaults to CPU_COUNT * 4. Returns: int: The chunk size. """ifmax_chunk_size<1:raiseValueError("max_chunk_size must be greater than 0")ifmax_chunk_size==1:return1chunks_num=tasks_num//num_process+1whilechunks_num>max_chunk_size:num_process*=2chunks_num=tasks_num//num_process+1returnchunks_num
[docs]defvery_easy_chunk_distribution(respect_memory_array:list[tuple[str,int]],num_process:int=DEFAULT_POOL_SIZE,max_chunk_size:int=CPU_COUNT*4,)->tuple[int,list[tuple[str,int]],list[list[str]]]:"""Distribute the chunk for multiprocess. The chunk distribution is based on the number of CPU cores. Args: respect_memory_array (list[tuple[str, int]]): The array of respect memory. Each element is a tuple of (id, memory). The id is the ID of the experiment, and the memory is the memory usage. The array is sorted by the memory usage. num_process (int, optional): The number of processes. Defaults to DEFAULT_POOL_SIZE. max_chunk_size (int, optional): The maximum chunk size. Defaults to CPU_COUNT * 4. Returns: tuple[int, list[tuple[str, int]], list[list[str]]]: The chunk distribution is a list of tuples of (id, memory). """ifmax_chunk_size<1:raiseValueError("max_chunk_size must be greater than 0")chunks_num=len(respect_memory_array)//num_process+1whilechunks_num>max_chunk_size:num_process*=2chunks_num=len(respect_memory_array)//num_process+1chunks_sorted_list=[]distributions=[[]for_inrange(num_process)]foriinrange(num_process):forjinrange(chunks_num):# Distribute the chunks in a round-robin fashionidx=j*num_process+iifj%2==0else(j+1)*num_process-i-1ifidx<len(respect_memory_array):chunks_sorted_list.append(respect_memory_array[idx])distributions[i].append(idx)returnchunks_num,chunks_sorted_list,distributions