1

Trying to upgrade this awesome implementation of gumble-softmax-vae found here. However, I keep getting

TypeError: Cannot convert a symbolic Keras input/output to a numpy array. 

I am stumped - tried many many things. Interestingly some searches return with other implementation of VAEs. I believe the error is somewhere in the "KL" term calculation of the loss.

Here is the almost working code:

import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt


batch_size = 10
data_dim = 784

M = 10  # classes
N = 30  # how many distributions

nb_epoch = 100
epsilon_std = 0.01
anneal_rate = 0.0003
min_temperature = 0.5

tau = tf.Variable(5.0, dtype=tf.float32)


class Sampling(keras.layers.Layer):
    def call(self, logits_y):
        u = tf.random.uniform(tf.shape(logits_y), 0, 1)
        y = logits_y - tf.math.log(
            -tf.math.log(u + 1e-20) + 1e-20
        )  # logits + gumbel noise
        y = tf.nn.softmax(tf.reshape(y, (-1, N, M)) / tau)
        y = tf.reshape(y, (-1, N * M))
        return y


encoder_inputs = keras.Input(shape=(data_dim))
x = keras.layers.Dense(512, activation="relu")(encoder_inputs)
x = keras.layers.Dense(256, activation="relu")(x)
logits_y = keras.layers.Dense(M * N, name="logits_y")(x)
z = Sampling()(logits_y)
encoder = keras.Model(encoder_inputs, z, name="encoder")
encoder.build(encoder_inputs)

print(encoder.summary())

decoder_inputs = keras.Input(shape=(N * M))
x = keras.layers.Dense(256, activation="relu")(decoder_inputs)
x = keras.layers.Dense(512, activation="relu")(x)
decoder_outputs = keras.layers.Dense(data_dim, activation="sigmoid")(x)
decoder = keras.Model(decoder_inputs, decoder_outputs, name="decoder")
decoder.build(decoder_inputs)

print(decoder.summary())


class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.bce = tf.keras.losses.BinaryCrossentropy()
        self.loss_tracker = keras.metrics.Mean(name="loss")

    @property
    def metrics(self):
        return [self.loss_tracker]

    def call(self, x):
        z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat

    @tf.function
    def gumbel_loss(self, y_true, y_pred, logits_y):
        q_y = tf.reshape(logits_y, (-1, N, M))
        q_y = tf.nn.softmax(q_y)
        log_q_y = tf.math.log(q_y + 1e-20)
        kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M))
        kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2))
        kl = tf.squeeze(kl, axis=0)
        elbo = data_dim * self.bce(y_true, y_pred) - kl
        return elbo

    def train_step(self, data):
        x = data

        with tf.GradientTape(persistent=True) as tape:
            z = self.encoder(x, training=True)
            x_hat = self.decoder(z, training=True)

            x = tf.cast(x, dtype=tf.float32)
            x_hat = tf.cast(x_hat, dtype=tf.float32)
            logits_y = self.encoder.get_layer('logits_y').output

            loss = self.gumbel_loss(x, x_hat, logits_y)

        grads = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}


def main():

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
        path="mnist.npz"
    )

    x_train = x_train.astype("float32") / 255.0
    x_test = x_test.astype("float32") / 255.0
    x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
    x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

    vae = VAE(encoder, decoder, name="vae-model")
    vae_inputs = (None, data_dim)
    vae.build(vae_inputs)
    vae.compile(optimizer="adam", loss=None)
    vae.fit(
        x_train,
        shuffle=True,
        epochs=1,
        batch_size=batch_size
    )

if __name__ == "__main__":
    main()

1 Answer 1

1

I think the main issue occurs when you try to get the output from the logits_y layer, (AFAIK), you can't do that, and instead, you need to build your encoder model with two outputs. Something like this way

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        # self.encoder = encoder 
        self.encoder = tf.keras.Model(inputs=encoder.input, 
                                      outputs=[encoder.get_layer(name='logits_y').output, 
                                               encoder.output])
        
        whatever...

So, in the training loop, this self.encoder will produce two outputs, one of them is the output of layer logit_y, which you need for some loss function. Lastly, change a few codes in other places for this, as follows

def call(self, x):
        _, z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat

@tf.function
    def gumbel_loss(self, y_true, y_pred, logits_y):
        q_y = tf.reshape(logits_y, (-1, N, M))
        q_y = tf.nn.softmax(q_y)
        log_q_y = tf.math.log(q_y + 1e-20)
        kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M))
        kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2))
        elbo = data_dim * self.bce(y_true, y_pred) - kl
        return elbo

And lastly, the train_step function; note, corresponding variables are already in tf.float32, no need to convert.

   def train_step(self, data):
        x = data
        with tf.GradientTape(persistent=True) as tape:
            logits_y, z = self.encoder(x, training=True)
            x_hat = self.decoder(z, training=True)
            loss = self.gumbel_loss(x, x_hat, logits_y) 
        grads = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

You don't need to change anything of the above code now, here is some training logs (running on cpu, tf 2.5).

Epoch 1/5
6000/6000 [==============================] - 60s 10ms/step - loss: 54.4604
Epoch 2/5
6000/6000 [==============================] - 60s 10ms/step - loss: 18.8960
Epoch 3/5
6000/6000 [==============================] - 59s 10ms/step - loss: 12.1036
Epoch 4/5
6000/6000 [==============================] - 59s 10ms/step - loss: 8.5804
Epoch 5/5
6000/6000 [==============================] - 59s 10ms/step - loss: 6.3916
Sign up to request clarification or add additional context in comments.

1 Comment

You the best - it worked. I changed my encoder output to encoder = keras.Model(encoder_inputs, [logits_y, z], name="encoder") - looked cleaner

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.