[https://docs.dask.org/en/stable/ Dask] est une bibliothèque polyvalente pour Python. Elle fournit des tableaux NumPy et des DataFrame Pandas permettant le calcul distribué en Python pur avec accès à la pile PyData. =Installer le wheel = La meilleure option est d'installer avec [https://pythonwheels.com/ Python wheels] comme suit : ::1. [[Utiliser_des_modules#Sous-commande_load|Chargez un module]] Python avec module load python. ::2. Créez et démarrez un [[Python/fr#Créer_et_utiliser_un_environnement_virtuel|environnement virtuel]]. ::3. Dans l'environnement virtuel, utilisez pip install pour installer dask et en option dask-distributed. :{{Command|prompt=(venv) [name@server ~]|pip install --no-index dask distributed }} =Soumettre une tâche= == Nœud simple == L’exemple suivant démarre une grappe Dask avec un nœud simple de 6 CPU et calcule la moyenne d’une colonne pour l'ensemble des données. {{File |name=dask-example.sh |lang="bash" |contents= #!/bin/bash #SBATCH --account= #SBATCH --ntasks=1 #SBATCH --cpus-per-task=6 #SBATCH --mem=8000M #SBATCH --time=0-00:05 #SBATCH --output=%N-%j.out module load python gcc arrow virtualenv --no-download $SLURM_TMPDIR/env source $SLURM_TMPDIR/env/bin/activate pip install dask distributed pandas --no-index source $SLURM_TMPDIR/env/bin/activate export DASK_SCHEDULER_ADDR=$(hostname) export DASK_SCHEDULER_PORT= $((30000 + $RANDOM % 10000)) dask scheduler --host $DASK_SCHEDULER_ADDR --port $DASK_SCHEDULER_PORT & dask worker "tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" --no-dashboard --nworkers=6 \ --nthreads=1 --local-directory=$SLURM_TMPDIR & sleep 10 python dask-example.py }} Ce script démarre une grappe Dask ayant autant de processus de travail que de coeurs dans la tâche. Chacun des processus crée au moins un fil d’exécution. Pour déterminer le nombre de processus et de fils, consultez [https://distributed.dask.org/en/stable/efficiency.html?highlight=workers%20threads#adjust-between-threads-and-processes la documentation officielle de Dask]. Ici, le dataframe Pandas est divisé en 6 parts et chaque processus en traitera une avec un CPU {{File |name=dask-example.py |lang="python" |contents= import pandas as pd from dask import dataframe as dd from dask.distributed import Client import os n_workers = int(os.environ['SLURM_CPUS_PER_TASK']) client = Client(f"tcp://{os.environ['DASK_SCHEDULER_ADDR']}:{os.environ['DASK_SCHEDULER_PORT']}") index = pd.date_range("2021-09-01", periods=2400, freq="1H") df = pd.DataFrame({"a": np.arange(2400)}, index=index) ddf = dd.from_pandas(df, npartitions=n_workers) # split the pandas data frame into "n_workers" chunks result = ddf.a.mean().compute() print(f"The mean is {result}") }} == Plusieurs nœuds == Dans le prochain exemple, nous reprenons l'exemple du nœud simple, mais cette fois avec une grappe Dask de deux nœuds comportant 6 CPU chacun. Nous créons aussi deux processus par nœud comportant trois cœurs chacun. {{File |name=dask-example.sh |lang="bash" |contents= #!/bin/bash #SBATCH --nodes 2 #SBATCH --tasks-per-node=2 #SBATCH --mem=16000M #SBATCH --cpus-per-task=3 #SBATCH --time=0-00:30 #SBATCH --output=%N-%j.out #SBATCH --account= module add python arrow export DASK_SCHEDULER_ADDR=$(hostname) export DASK_SCHEDULER_PORT=34567 srun -N 2 -n 2 config_virtualenv.sh # set both -N and -n to the number of nodes source $SLURM_TMPDIR/env/bin/activate dask scheduler --host $DASK_SCHEDULER_ADDR --port $DASK_SCHEDULER_PORT & sleep 10 srun launch_dask_workers.sh & dask_cluster_pid=$! sleep 10 python test_dask.py kill $dask_cluster_pid # shut down Dask workers after the python process exits }} où le script config_virtualenv.sh est {{File |name=config_env.sh |lang="bash" |contents= #!/bin/bash echo "From node ${SLURM_NODEID}: installing virtualenv..." module load python gcc arrow virtualenv --no-download $SLURM_TMPDIR/env source $SLURM_TMPDIR/env/bin/activate pip install --no-index dask[distributed,dataframe] echo "Done installing virtualenv!" deactivate }} et le script launch_dask_workers.sh est {{File |name=launch_dask_workers.sh |lang="bash" |contents= #!/bin/bash source $SLURM_TMPDIR/env/bin/activate SCHEDULER_CONNECTION_STRING="tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" if [[ "$SLURM_PROCID" -eq "0" ]]; then ## On the SLURM task with Rank 0, where the Dask scheduler process has already been launched, we launch a smaller worker, ## with 40% of the job's memory and we subtract one core from the task to leave it for the scheduler. DASK_WORKER_MEM=0.4 DASK_WORKER_THREADS=$(($SLURM_CPUS_PER_TASK-1)) else ## On all other SLURM tasks, each worker gets half of the job's allocated memory and all the cores allocated to its task. DASK_WORKER_MEM=0.5 DASK_WORKER_THREADS=$SLURM_CPUS_PER_TASK fi dask worker "tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" --no-dashboard --nworkers=1 \ --nthreads=$DASK_WORKER_THREADS --memory-limit=$DASK_WORKER_MEM --local-directory=$SLURM_TMPDIR sleep 5 echo "dask worker started!" }} Enfin, le script test_dask.py est {{File |name=test_dask.py |lang="python" |contents= import pandas as pd import numpy as np from dask import dataframe as dd from dask.distributed import Client import os client = Client(f"tcp://{os.environ['DASK_SCHEDULER_ADDR']}:{os.environ['DASK_SCHEDULER_PORT']}") index = pd.date_range("2021-09-01", periods=2400, freq="1H") df = pd.DataFrame({"a": np.arange(2400)}, index=index) ddf = dd.from_pandas(df, npartitions=6) result = ddf.a.mean().compute() print(f"The mean is {result}") }}