{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "Model training with training data generated on the fly\n", "=======================================================\n", "\n", "Depending on the concrete usecase, data generation speed with AcouPipe can be fast enough to directly incorporate the generated data for model training without the need of intermediate file saving.\n", "\n", "This example demonstrates how to **generate training data on the fly** for the most simple **supervised source localization** tasks.\n", "\n", "Here, the example demonstrates **single source localization** model training similar to [KHS19], but without predicting the source strength. \n", "For demonstration, the **Beamforming map** is created with calculation `mode=\"wishart\"` (no time data is simulated).\n", "\n", "To prevent thread overloading due to parallel data generation, the number of parallel numba threads was limited by exporting the variable `export NUMBA_NUM_THREADS=1` before running the script.\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2023-11-14 14:28:06,089\tINFO worker.py:1673 -- Started a local Ray instance.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Num GPUs: 1\n", "Num CPUs: 48\n", "Numba number of concurrent threads: 1\n" ] } ], "source": [ "import os\n", "os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' \n", "import multiprocessing\n", "import numba\n", "import tensorflow as tf\n", "import ray\n", "ray.shutdown() # shutdown existing tasks\n", "ray.init(log_to_driver=False) # start a ray server (without logging the details for clean documentation)\n", "physical_devices = tf.config.list_physical_devices('GPU')\n", "\n", "print(\"Num GPUs:\", len(physical_devices))\n", "print(\"Num CPUs:\", multiprocessing.cpu_count())\n", "print(\"Numba number of concurrent threads:\", numba.config.NUMBA_NUM_THREADS)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Build the dataset generator\n", "\n", "At first, we manipulate the dataset config to only create single source examples on a coarser grid of size $32 \\times 32$" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/lib/python3.8/dist-packages/acoupipe/datasets/features.py:104: Warning: Queried frequency (1000 Hz) not in set of discrete FFT sample frequencies. Using frequency 1071.88 Hz instead.\n", " fidx = [get_frequency_index_range(\n" ] } ], "source": [ "import numpy as np\n", "from acoupipe.datasets.synthetic import DatasetSynthetic\n", "\n", "# create dataset (calculated on a GPU Workstation with several cpus)\n", "dataset = DatasetSynthetic(max_nsources=1, tasks=multiprocessing.cpu_count(), mode='wishart') \n", "\n", "# we manipulate the grid to have a coarser resolution \n", "dataset.config.grid.increment = 1/31 # 32 x 32 grid\n", "\n", "# build TensorFlow datasets for training and validation\n", "training_dataset = dataset.get_tf_dataset(\n", " features=[\"sourcemap\",\"loc\"], f=1000, split=\"training\",size=100000000) # quasi infinite\n", "validation_dataset = dataset.get_tf_dataset(\n", " features=[\"sourcemap\",\"loc\"], f=1000, split=\"validation\",size=100)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The TensorFlow dataset API can be used to build a data pipeline from the data generator. Here, batches with 16 source cases are used. We use the `prefetch` method to generate data when during training steps on the GPU, where the CPUs are usually idle. For the validation_dataset, the `cache` method prevents recalculation of the validation data" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf \n", "\n", "def yield_features_and_labels(data): \n", " feature = data['sourcemap'][0]\n", " f_max = tf.reduce_max(feature)\n", " feature /= f_max\n", " label = data['loc'][:2]\n", " return (feature,label)\n", "\n", "training_dataset = training_dataset.map(yield_features_and_labels).batch(16).prefetch(tf.data.AUTOTUNE)\n", "validation_dataset = validation_dataset.map(yield_features_and_labels).batch(16).cache()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train the model\n", "\n", "Now, one can build the ResNet50V2 model and use the data to fit the model. This may take up to a few hours, depending on the computational infrastructure. " ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Epoch 1/5\n", "10000/10000 - 493s - loss: 0.0167 - val_loss: 0.7074 - 493s/epoch - 49ms/step\n", "Epoch 2/5\n", "10000/10000 - 443s - loss: 0.0062 - val_loss: 0.0046 - 443s/epoch - 44ms/step\n", "Epoch 3/5\n", "10000/10000 - 443s - loss: 0.0042 - val_loss: 0.0403 - 443s/epoch - 44ms/step\n", "Epoch 4/5\n", "10000/10000 - 433s - loss: 0.0019 - val_loss: 0.0104 - 433s/epoch - 43ms/step\n", "Epoch 5/5\n", "10000/10000 - 432s - loss: 0.0011 - val_loss: 0.0133 - 432s/epoch - 43ms/step\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# build model architecture\n", "model = tf.keras.Sequential(\n", " tf.keras.applications.resnet_v2.ResNet50V2(\n", " include_top=False,\n", " weights=None,\n", " input_shape=(32,32,1),\n", " ))\n", "model.add(tf.keras.layers.Flatten())\n", "model.add(tf.keras.layers.Dense(2, activation=None))\n", "\n", "# compile and fit\n", "model.compile(optimizer=tf.optimizers.Adam(),loss='mse')\n", "model.fit(training_dataset,validation_data=validation_dataset, epochs=5,steps_per_epoch=10000, verbose=2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After successfully training, the model can be used for source localization." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1/1 [==============================] - 0s 40ms/step\n", "[ 0.23072581 -0.0644986 ]\n" ] } ], "source": [ "dataset.tasks=1\n", "test_dataset = dataset.get_tf_dataset(\n", " features=[\"sourcemap\",\"loc\"], f=1000, split=\"validation\",size=1, start_idx=2) \n", "test_dataset = test_dataset.map(yield_features_and_labels).batch(1)\n", "sourcemap, labels = next(iter(test_dataset))\n", "\n", "prediction = model.predict(sourcemap)[0]\n", "print(prediction)\n", "\n", "sourcemap = sourcemap.numpy().squeeze()\n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import matplotlib.pyplot as plt\n", "from acoular import L_p\n", "\n", "extent = dataset.config.grid.extend() \n", "loc = labels[0]\n", "\n", "plt.figure()\n", "plt.imshow(L_p(sourcemap).T,\n", " vmax=L_p(sourcemap.max()),\n", " vmin=L_p(sourcemap.max())-15,\n", " extent=extent,\n", " origin=\"lower\")\n", "plt.plot(prediction[0],prediction[1],'x',label=\"prediction\")\n", "plt.plot(loc[0],loc[1],'x',label=\"label\")\n", "plt.colorbar()\n", "plt.legend()\n", "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "vscode": { "interpreter": { "hash": "8b84133aa5d27198834684dc5cf37286f31547fcb562f18c04d9e25d99e7281e" } } }, "nbformat": 4, "nbformat_minor": 4 }