ytbilly3636’s 研究備忘録

機械学習,Python,ガンダムなど

Chainerで畳み込みのAutoencoderを実装した

こんにちは

ChainerのReference ManualにDeconvolution2Dというクラスが有るのを見つけました.

Standard Link implementations — Chainer 2.0.0 documentation

Deconvolutionってなんぞと思い,調べると以下のページが見つかりました.

qiita.com

特徴マップを拡大して畳み込むことで,畳み込む前の画像を復元するイメージでしょうか. ということは,

Input -> Convolution -> Feature Maps -> Deconvolution -> Output

という処理を行えば, Autoencoder畳み込みバージョンができるのではないかと思い試してみました. いわゆる,Convolutional Autoencoderはこのような実装なんですかね. 調べていないのでなんとも言えませんが.

環境

項目 詳細
OS Ubuntu 14.04
GPU GTX 970
Ver Chainer 1.18.0

コード

可視化のためOpenCVとmatplotlibを用いています. 高速化のためGPUを用いています.

# -*- coding: utf-8 -*-

import sys, os, cv2
import numpy as np
import matplotlib.pyplot as plt
import chainer
import chainer.links as L
import chainer.functions as F
from chainer import cuda, Variable, optimizers, Chain, datasets

# 畳み込みAutoencoderの定義
class CAE(Chain):
    # 入力は3チャンネルのカラー画像を想定しているので入出力を3チャンネルに
    def __init__(self, num_filter, size_filter):
        super(CAE, self).__init__(
            conv1 = L.Convolution2D(3, num_filter, size_filter),
            dcnv1 = L.Deconvolution2D(num_filter, 3, size_filter),
        )
        return
    
    # 活性化関数はReLU    
    def __call__(self, x):
        f1 = F.relu(self.conv1(x))
        f2 = F.relu(self.dcnv1(f1))
        return f2

    # 損失関数はL2ノルムとした  
    def loss(self, x):
        return F.mean_squared_error(self.__call__(x), x)


# 最適化手法:Adam        
cae = CAE(num_filter=16, size_filter=5)
opt = optimizers.Adam()
opt.setup(cae)
cae.to_gpu()


# CIFAR10を学習
train, test = datasets.get_cifar10()


# 1epoch分の学習
def train_epoch(batch_size):
    # グラフ描画用のarray
    x_graph = np.arange(0, train._length / batch_size)
    y_graph = np.zeros(train._length / batch_size)

    # batch_size分の画像をtrain._length/batch_size回に分けてミニバッチ学習
    perm = np.random.permutation(train._length)
    for i in xrange(0, train._length, batch_size):
        x_batch = Variable(cuda.to_gpu(cuda.cupy.asarray(train[perm[i:i+batch_size]][0], dtype=cuda.cupy.float32)))
        
        # 損失の計算・逆伝播・更新
        cae.zerograds()
        loss = cae.loss(x_batch)
        loss.backward()
        opt.update()
        
        # 損失をグラフに描画
        y_graph[i / batch_size] = loss.data
        plt.clf()
        plt.title('loss')
        plt.plot(x_graph, y_graph)
        plt.pause(.01)
        
        # test[0][0]をエンコードしてデコードしたものを可視化
        encode_and_decode(number=0)
    return


# エンコードしてデコードしたものを表示
def encode_and_decode(number):
    x = Variable(cuda.to_gpu(cuda.cupy.asarray(test[number][0], dtype=cuda.cupy.float32).reshape(1, 3, 32, 32)))   
    y = cae(x)

    # OpenCVの形式に合わせる
    x_data = cv2.cvtColor(np.rollaxis(cuda.to_cpu(x.data).reshape(3, 32, 32), 0, 3), cv2.COLOR_RGB2BGR)
    y_data = cv2.cvtColor(np.rollaxis(cuda.to_cpu(y.data).reshape(3, 32, 32), 0, 3), cv2.COLOR_RGB2BGR)

    cv2.imshow("origin:" + str(number), cv2.resize(x_data, (200, 200)))
    cv2.imshow("result:" + str(number), cv2.resize(y_data, (200, 200)))
    cv2.waitKey(1)
    return


# 実行    
train_epoch(batch_size=100)

更新とともに,画像が復元できていくのを確認できると思います.

f:id:ytbilly3636:20170514211629j:plain
学習回数:0

f:id:ytbilly3636:20170514211718j:plain
学習回数:5000

f:id:ytbilly3636:20170514211738j:plain
学習回数:45000

ちなみに入力画像はこんな感じでした.
f:id:ytbilly3636:20170514211824j:plain

簡単な解説

CAEクラス

今回の一番のポイントはこのクラスです.

class CAE(Chain):
    def __init__(self, num_filter, size_filter):
        super(CAE, self).__init__(
            conv1 = L.Convolution2D(3, num_filter, size_filter),
            dcnv1 = L.Deconvolution2D(num_filter, 3, size_filter),
        )
        return
    
    def __call__(self, x):
        f1 = F.relu(self.conv1(x))
        f2 = F.relu(self.dcnv1(f1))
        return f2

    def loss(self, x):
        return F.mean_squared_error(self.__call__(x), x)

L.Convolution2D,及びL.Deconvolution2Dの引数は順に,入力の枚数,特徴マップ(出力)の数,フィルターのサイズです. 今回の場合3チャンネルの画像を畳み込んで16枚の特徴マップを出力し,16枚の特徴マップを逆畳み込みして3枚の出力, すなわち3チャネルのカラー画像を出力するようにしています.

損失関数はネットワークの出力と教師信号(すなわち入力画像)とのL2ノルムで求めています. これはF.mean_squared_errorを使えば簡単に求めることができます.

学習

1バッチ分の学習は以下のコードによって行われます.

cae.zerograds()
loss = cae.loss(x_batch)
loss.backward()
opt.update()

損失lossを求めて,それを逆伝播させて,optimizersで更新するだけです.

1バッチ分の入力はどうやって作ってもよいと思いますが, この方法が一番簡単かと思います.

# 0~49999の数字をランダムに並び替えた配列を用意
perm = np.random.permutation(train._length)

# permの0~99番目,100~199番目,…というように選んでいく
for i in xrange(0, train._length, batch_size):
    x_batch = Variable(cuda.to_gpu(cuda.cupy.asarray(train[perm[i:i+batch_size]][0], dtype=cuda.cupy.float32)))
    

CPU版の場合,cuda.cupyをnpに変えてcuda.to_gpuを削除したらよいと思います.

可視化

私はここで少し詰まりました. 今回GPUを使っているので,入力データであるxと出力データであるyは(多分)GPUのメモリにあります. これをnumpy.arrayとして扱いたい場合は一旦cuda.to_cpuをかましてやる必要があります.

x = Variable(cuda.to_gpu(cuda.cupy.asarray(test[number][0], dtype=cuda.cupy.float32).reshape(1, 3, 32, 32)))   
y = cae(x)
x_data = cv2.cvtColor(np.rollaxis(cuda.to_cpu(x.data).reshape(3, 32, 32), 0, 3), cv2.COLOR_RGB2BGR)
y_data = cv2.cvtColor(np.rollaxis(cuda.to_cpu(y.data).reshape(3, 32, 32), 0, 3), cv2.COLOR_RGB2BGR)