How To Do Model Predict Using Distributed Dask With a Pre-Trained Keras Model?


I am loading my pre-trained keras model and then trying to parallelize a large number of input data using dask? Unfortunately, I'm running into some issues with this relating to how I'm creating my dask array. Any guidance would be greatly appreciated!


First I cloned from this repo https://github.com/sanchit2843/dlworkshop.git

Reproducible Code Example:

import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.model_selection import train_test_split from keras.models import load_model import keras from keras.models import Sequential from keras.layers import Dense from dask.distributed import Client import warnings import dask.array as DaskArray warnings.filterwarnings('ignore') dataset = pd.read_csv('data/train.csv') X = dataset.drop(['price_range'], axis=1).values y = dataset[['price_range']].values # scale data sc = StandardScaler() X = sc.fit_transform(X) ohe = OneHotEncoder() y = ohe.fit_transform(y).toarray() X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = 0.2) # Neural network model = Sequential() model.add(Dense(16, input_dim=20, activation="relu")) model.add(Dense(12, activation="relu")) model.add(Dense(4, activation="softmax")) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) model.fit(X_train, y_train, epochs=100, batch_size=64) # Use dask client = Client() def load_and_predict(input_data_chunk): def contrastive_loss(y_true, y_pred): margin = 1 square_pred = K.square(y_pred) margin_square = K.square(K.maximum(margin - y_pred, 0)) return K.mean(y_true * square_pred + (1 - y_true) * margin_square) mlflow.set_tracking_uri('<uri>') mlflow.set_experiment('clean_parties_ml') runs = mlflow.search_runs() artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri'] model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss}) y_pred = model.predict(input_data_chunk) return y_pred da_input_data = da.from_array(X_test, chunks=(100, None)) prediction_results = da_input_data.map_blocks(load_and_predict, dtype=X_test.dtype).compute()

The Error I'm receiving:

AttributeError: '_thread._local' object has no attribute 'value'

Keras/Tensorflow don't play nicely with other threaded systems. There is an ongoing issue on this topic here: https://github.com/dask/dask-examples/issues/35