.. _sec_utils:
Utility Functions and Classes
=============================
This section contains the implementations of utility functions and
classes used in this book.
.. raw:: html
.. raw:: html
.. raw:: latex
\diilbookstyleinputcell
.. code:: python
def load_array(data_arrays, batch_size, is_train=True): #@save
"""Construct a PyTorch data iterator."""
dataset = torch.utils.data.TensorDataset(*data_arrays)
return torch.utils.data.DataLoader(dataset, batch_size, shuffle=is_train)
def synthetic_data(w, b, num_examples): #@save
"""Generate y = Xw + b + noise."""
X = torch.normal(0, 1, (num_examples, len(w)))
y = torch.matmul(X, w) + b
y += torch.normal(0, 0.01, y.shape)
return X, y.reshape((-1, 1))
def sgd(params, lr, batch_size): #@save
"""Minibatch stochastic gradient descent."""
with torch.no_grad():
for param in params:
param -= lr * param.grad / batch_size
param.grad.zero_()
def get_dataloader_workers(): #@save
"""Use 4 processes to read the data."""
return 4
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Download the Fashion-MNIST dataset and then load it into memory."""
trans = [transforms.ToTensor()]
if resize:
trans.insert(0, transforms.Resize(resize))
trans = transforms.Compose(trans)
mnist_train = torchvision.datasets.FashionMNIST(
root="../data", train=True, transform=trans, download=True)
mnist_test = torchvision.datasets.FashionMNIST(
root="../data", train=False, transform=trans, download=True)
return (torch.utils.data.DataLoader(mnist_train, batch_size, shuffle=True,
num_workers=get_dataloader_workers()),
torch.utils.data.DataLoader(mnist_test, batch_size, shuffle=False,
num_workers=get_dataloader_workers()))
def evaluate_accuracy_gpu(net, data_iter, device=None): #@save
"""Compute the accuracy for a model on a dataset using a GPU."""
if isinstance(net, nn.Module):
net.eval() # Set the model to evaluation mode
if not device:
device = next(iter(net.parameters())).device
# No. of correct predictions, no. of predictions
metric = d2l.Accumulator(2)
with torch.no_grad():
for X, y in data_iter:
if isinstance(X, list):
# Required for BERT Fine-tuning (to be covered later)
X = [x.to(device) for x in X]
else:
X = X.to(device)
y = y.to(device)
metric.add(d2l.accuracy(net(X), y), y.numel())
return metric[0] / metric[1]
#@save
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
"""Train a model with a GPU (defined in Chapter 6)."""
def init_weights(m):
if type(m) == nn.Linear or type(m) == nn.Conv2d:
nn.init.xavier_uniform_(m.weight)
net.apply(init_weights)
print('training on', device)
net.to(device)
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss()
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
legend=['train loss', 'train acc', 'test acc'])
timer, num_batches = d2l.Timer(), len(train_iter)
for epoch in range(num_epochs):
# Sum of training loss, sum of training accuracy, no. of examples
metric = d2l.Accumulator(3)
net.train()
for i, (X, y) in enumerate(train_iter):
timer.start()
optimizer.zero_grad()
X, y = X.to(device), y.to(device)
y_hat = net(X)
l = loss(y_hat, y)
l.backward()
optimizer.step()
with torch.no_grad():
metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
timer.stop()
train_l = metric[0] / metric[2]
train_acc = metric[1] / metric[2]
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,
(train_l, train_acc, None))
test_acc = evaluate_accuracy_gpu(net, test_iter)
animator.add(epoch + 1, (None, None, test_acc))
print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
f'test acc {test_acc:.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
f'on {str(device)}')
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""Plot a list of images."""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = img.detach().numpy()
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
def linreg(X, w, b): #@save
"""The linear regression model."""
return torch.matmul(X, w) + b
def squared_loss(y_hat, y): #@save
"""Squared loss."""
return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2
def get_fashion_mnist_labels(labels): #@save
"""Return text labels for the Fashion-MNIST dataset."""
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
return [text_labels[int(i)] for i in labels]
class Animator: #@save
"""For plotting data in animation."""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
# Incrementally plot multiple lines
if legend is None:
legend = []
d2l.use_svg_display()
self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes, ]
# Use a lambda function to capture arguments
self.config_axes = lambda: d2l.set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
def add(self, x, y):
# Add multiple data points into the figure
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
class Accumulator: #@save
"""For accumulating sums over `n` variables."""
def __init__(self, n):
self.data = [0.0] * n
def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]
def reset(self):
self.data = [0.0] * len(self.data)
def __getitem__(self, idx):
return self.data[idx]
def accuracy(y_hat, y): #@save
"""Compute the number of correct predictions."""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = y_hat.argmax(axis=1)
cmp = y_hat.type(y.dtype) == y
return float(cmp.type(y.dtype).sum())
import hashlib
import os
import tarfile
import zipfile
import requests
def download(url, folder='../data', sha1_hash=None): #@save
"""Download a file to folder and return the local filepath."""
if not url.startswith('http'):
# For back compatability
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# Check if hit cache
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# Download
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""Extract a zip/tar file into folder."""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
def download_extract(name, folder=None): #@save
"""Download and extract a zip/tar file."""
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False, 'Only zip/tar files can be extracted.'
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
def tokenize(lines, token='word'): #@save
"""Split text lines into word or character tokens."""
assert token in ('word', 'char'), 'Unknown token type: ' + token
return [line.split() if token == 'word' else list(line) for line in lines]
def evaluate_loss(net, data_iter, loss): #@save
"""Evaluate the loss of a model on the given dataset."""
metric = d2l.Accumulator(2) # Sum of losses, no. of examples
for X, y in data_iter:
out = net(X)
y = y.reshape(out.shape)
l = loss(out, y)
metric.add(l.sum(), l.numel())
return metric[0] / metric[1]
def grad_clipping(net, theta): #@save
"""Clip the gradient."""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
.. raw:: html
.. raw:: html
.. raw:: latex
\diilbookstyleinputcell
.. code:: python
def load_array(data_arrays, batch_size, is_train=True): #@save
"""Construct a Gluon data iterator."""
dataset = gluon.data.ArrayDataset(*data_arrays)
return gluon.data.DataLoader(dataset, batch_size, shuffle=is_train)
def synthetic_data(w, b, num_examples): #@save
"""Generate y = Xw + b + noise."""
X = np.random.normal(0, 1, (num_examples, len(w)))
y = np.dot(X, w) + b
y += np.random.normal(0, 0.01, y.shape)
return X, y.reshape((-1, 1))
def sgd(params, lr, batch_size): #@save
"""Minibatch stochastic gradient descent."""
for param in params:
param[:] = param - lr * param.grad / batch_size
def get_dataloader_workers(): #@save
"""Use 4 processes to read the data except for Windows."""
return 0 if sys.platform.startswith('win') else 4
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Download the Fashion-MNIST dataset and then load it into memory."""
dataset = gluon.data.vision
trans = [dataset.transforms.ToTensor()]
if resize:
trans.insert(0, dataset.transforms.Resize(resize))
trans = dataset.transforms.Compose(trans)
mnist_train = dataset.FashionMNIST(train=True).transform_first(trans)
mnist_test = dataset.FashionMNIST(train=False).transform_first(trans)
return (gluon.data.DataLoader(mnist_train, batch_size, shuffle=True,
num_workers=get_dataloader_workers()),
gluon.data.DataLoader(mnist_test, batch_size, shuffle=False,
num_workers=get_dataloader_workers()))
def evaluate_accuracy_gpu(net, data_iter, device=None): #@save
"""Compute the accuracy for a model on a dataset using a GPU."""
if not device: # Query the first device where the first parameter is on
device = list(net.collect_params().values())[0].list_ctx()[0]
# No. of correct predictions, no. of predictions
metric = d2l.Accumulator(2)
for X, y in data_iter:
X, y = X.as_in_ctx(device), y.as_in_ctx(device)
metric.add(d2l.accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
#@save
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
"""Train a model with a GPU (defined in Chapter 6)."""
net.initialize(force_reinit=True, ctx=device, init=init.Xavier())
loss = gluon.loss.SoftmaxCrossEntropyLoss()
trainer = gluon.Trainer(net.collect_params(),
'sgd', {'learning_rate': lr})
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
legend=['train loss', 'train acc', 'test acc'])
timer, num_batches = d2l.Timer(), len(train_iter)
for epoch in range(num_epochs):
# Sum of training loss, sum of training accuracy, no. of examples
metric = d2l.Accumulator(3)
for i, (X, y) in enumerate(train_iter):
timer.start()
# Here is the major difference from `d2l.train_epoch_ch3`
X, y = X.as_in_ctx(device), y.as_in_ctx(device)
with autograd.record():
y_hat = net(X)
l = loss(y_hat, y)
l.backward()
trainer.step(X.shape[0])
metric.add(l.sum(), d2l.accuracy(y_hat, y), X.shape[0])
timer.stop()
train_l = metric[0] / metric[2]
train_acc = metric[1] / metric[2]
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,
(train_l, train_acc, None))
test_acc = evaluate_accuracy_gpu(net, test_iter)
animator.add(epoch + 1, (None, None, test_acc))
print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
f'test acc {test_acc:.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
f'on {str(device)}')
def grad_clipping(net, theta): #@save
"""Clip the gradient."""
if isinstance(net, gluon.Block):
params = [p.data() for p in net.collect_params().values()]
else:
params = net.params
norm = math.sqrt(sum((p.grad ** 2).sum() for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
def evaluate_accuracy(net, data_iter): #@save
"""Compute the accuracy for a model on a dataset."""
metric = Accumulator(2) # No. of correct predictions, no. of predictions
for X, y in data_iter:
metric.add(accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""Plot a list of images."""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = img.asnumpy()
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
def linreg(X, w, b): #@save
"""The linear regression model."""
return np.dot(X, w) + b
def squared_loss(y_hat, y): #@save
"""Squared loss."""
return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2
def get_fashion_mnist_labels(labels): #@save
"""Return text labels for the Fashion-MNIST dataset."""
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
return [text_labels[int(i)] for i in labels]
class Animator: #@save
"""For plotting data in animation."""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
# Incrementally plot multiple lines
if legend is None:
legend = []
d2l.use_svg_display()
self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes, ]
# Use a lambda function to capture arguments
self.config_axes = lambda: d2l.set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
def add(self, x, y):
# Add multiple data points into the figure
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
class Accumulator: #@save
"""For accumulating sums over `n` variables."""
def __init__(self, n):
self.data = [0.0] * n
def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]
def reset(self):
self.data = [0.0] * len(self.data)
def __getitem__(self, idx):
return self.data[idx]
def accuracy(y_hat, y): #@save
"""Compute the number of correct predictions."""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = y_hat.argmax(axis=1)
cmp = y_hat.astype(y.dtype) == y
return float(cmp.astype(y.dtype).sum())
import hashlib
import os
import tarfile
import zipfile
import requests
def download(url, folder='../data', sha1_hash=None): #@save
"""Download a file to folder and return the local filepath."""
if not url.startswith('http'):
# For back compatability
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# Check if hit cache
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# Download
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""Extract a zip/tar file into folder."""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
def download_extract(name, folder=None): #@save
"""Download and extract a zip/tar file."""
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False, 'Only zip/tar files can be extracted.'
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
def tokenize(lines, token='word'): #@save
"""Split text lines into word or character tokens."""
assert token in ('word', 'char'), 'Unknown token type: ' + token
return [line.split() if token == 'word' else list(line) for line in lines]
def evaluate_loss(net, data_iter, loss): #@save
"""Evaluate the loss of a model on the given dataset."""
metric = d2l.Accumulator(2) # Sum of losses, no. of examples
for X, y in data_iter:
l = loss(net(X), y)
metric.add(l.sum(), d2l.size(l))
return metric[0] / metric[1]
.. raw:: html
.. raw:: html
.. raw:: latex
\diilbookstyleinputcell
.. code:: python
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""Plot a list of images."""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = np.asarray(img)
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
import hashlib
import os
import tarfile
import zipfile
import requests
def download(url, folder='../data', sha1_hash=None): #@save
"""Download a file to folder and return the local filepath."""
if not url.startswith('http'):
# For back compatability
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# Check if hit cache
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# Download
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""Extract a zip/tar file into folder."""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
.. raw:: html
.. raw:: html
.. raw:: latex
\diilbookstyleinputcell
.. code:: python
def load_array(data_arrays, batch_size, is_train=True): #@save
"""Construct a TensorFlow data iterator."""
dataset = tf.data.Dataset.from_tensor_slices(data_arrays)
if is_train:
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size)
return dataset
def synthetic_data(w, b, num_examples): #@save
"""Generate y = Xw + b + noise."""
X = tf.zeros((num_examples, w.shape[0]))
X += tf.random.normal(shape=X.shape)
y = tf.matmul(X, tf.reshape(w, (-1, 1))) + b
y += tf.random.normal(shape=y.shape, stddev=0.01)
y = tf.reshape(y, (-1, 1))
return X, y
def sgd(params, grads, lr, batch_size): #@save
"""Minibatch stochastic gradient descent."""
for param, grad in zip(params, grads):
param.assign_sub(lr * grad / batch_size)
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Download the Fashion-MNIST dataset and then load it into memory."""
mnist_train, mnist_test = tf.keras.datasets.fashion_mnist.load_data()
# Divide all numbers by 255 so that all pixel values are between
# 0 and 1, add a batch dimension at the last. And cast label to int32
process = lambda X, y: (tf.expand_dims(X, axis=3) / 255,
tf.cast(y, dtype='int32'))
resize_fn = lambda X, y: (
tf.image.resize_with_pad(X, resize, resize) if resize else X, y)
return (
tf.data.Dataset.from_tensor_slices(process(*mnist_train)).batch(
batch_size).shuffle(len(mnist_train[0])).map(resize_fn),
tf.data.Dataset.from_tensor_slices(process(*mnist_test)).batch(
batch_size).map(resize_fn))
class TrainCallback(tf.keras.callbacks.Callback): #@save
"""A callback to visiualize the training progress."""
def __init__(self, net, train_iter, test_iter, num_epochs, device_name):
self.timer = d2l.Timer()
self.animator = d2l.Animator(
xlabel='epoch', xlim=[1, num_epochs], legend=[
'train loss', 'train acc', 'test acc'])
self.net = net
self.train_iter = train_iter
self.test_iter = test_iter
self.num_epochs = num_epochs
self.device_name = device_name
def on_epoch_begin(self, epoch, logs=None):
self.timer.start()
def on_epoch_end(self, epoch, logs):
self.timer.stop()
test_acc = self.net.evaluate(
self.test_iter, verbose=0, return_dict=True)['accuracy']
metrics = (logs['loss'], logs['accuracy'], test_acc)
self.animator.add(epoch + 1, metrics)
if epoch == self.num_epochs - 1:
batch_size = next(iter(self.train_iter))[0].shape[0]
num_examples = batch_size * tf.data.experimental.cardinality(
self.train_iter).numpy()
print(f'loss {metrics[0]:.3f}, train acc {metrics[1]:.3f}, '
f'test acc {metrics[2]:.3f}')
print(f'{num_examples / self.timer.avg():.1f} examples/sec on '
f'{str(self.device_name)}')
#@save
def train_ch6(net_fn, train_iter, test_iter, num_epochs, lr, device):
"""Train a model with a GPU (defined in Chapter 6)."""
device_name = device._device_name
strategy = tf.distribute.OneDeviceStrategy(device_name)
with strategy.scope():
optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
net = net_fn()
net.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
callback = TrainCallback(net, train_iter, test_iter, num_epochs,
device_name)
net.fit(train_iter, epochs=num_epochs, verbose=0, callbacks=[callback])
return net
def evaluate_accuracy(net, data_iter): #@save
"""Compute the accuracy for a model on a dataset."""
metric = Accumulator(2) # No. of correct predictions, no. of predictions
for X, y in data_iter:
metric.add(accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""Plot a list of images."""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = img.numpy()
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
def linreg(X, w, b): #@save
"""The linear regression model."""
return tf.matmul(X, w) + b
def squared_loss(y_hat, y): #@save
"""Squared loss."""
return (y_hat - tf.reshape(y, y_hat.shape)) ** 2 / 2
def get_fashion_mnist_labels(labels): #@save
"""Return text labels for the Fashion-MNIST dataset."""
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
return [text_labels[int(i)] for i in labels]
class Animator: #@save
"""For plotting data in animation."""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
# Incrementally plot multiple lines
if legend is None:
legend = []
d2l.use_svg_display()
self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes, ]
# Use a lambda function to capture arguments
self.config_axes = lambda: d2l.set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
def add(self, x, y):
# Add multiple data points into the figure
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
class Accumulator: #@save
"""For accumulating sums over `n` variables."""
def __init__(self, n):
self.data = [0.0] * n
def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]
def reset(self):
self.data = [0.0] * len(self.data)
def __getitem__(self, idx):
return self.data[idx]
def accuracy(y_hat, y): #@save
"""Compute the number of correct predictions."""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = tf.argmax(y_hat, axis=1)
cmp = tf.cast(y_hat, y.dtype) == y
return float(tf.reduce_sum(tf.cast(cmp, y.dtype)))
import hashlib
import os
import tarfile
import zipfile
import requests
def download(url, folder='../data', sha1_hash=None): #@save
"""Download a file to folder and return the local filepath."""
if not url.startswith('http'):
# For back compatability
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# Check if hit cache
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# Download
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""Extract a zip/tar file into folder."""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
def download_extract(name, folder=None): #@save
"""Download and extract a zip/tar file."""
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False, 'Only zip/tar files can be extracted.'
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
def tokenize(lines, token='word'): #@save
"""Split text lines into word or character tokens."""
assert token in ('word', 'char'), 'Unknown token type: ' + token
return [line.split() if token == 'word' else list(line) for line in lines]
def evaluate_loss(net, data_iter, loss): #@save
"""Evaluate the loss of a model on the given dataset."""
metric = d2l.Accumulator(2) # Sum of losses, no. of examples
for X, y in data_iter:
l = loss(net(X), y)
metric.add(tf.reduce_sum(l), d2l.size(l))
return metric[0] / metric[1]
def grad_clipping(grads, theta): #@save
"""Clip the gradient."""
theta = tf.constant(theta, dtype=tf.float32)
new_grad = []
for grad in grads:
if isinstance(grad, tf.IndexedSlices):
new_grad.append(tf.convert_to_tensor(grad))
else:
new_grad.append(grad)
norm = tf.math.sqrt(sum((tf.reduce_sum(grad ** 2)).numpy()
for grad in new_grad))
norm = tf.cast(norm, tf.float32)
if tf.greater(norm, theta):
for i, grad in enumerate(new_grad):
new_grad[i] = grad * theta / norm
else:
new_grad = new_grad
return new_grad
.. raw:: html
.. raw:: html
More for the attention chapter.
.. raw:: html