TensorFlow로 멀티 GPU 분산 트레이닝하기
- 원본 링크 : https://keras.io/guides/distributed_training_with_tensorflow/
- 최종 확인 : 2024-11-18
저자 : fchollet
생성일 : 2020/04/28
최종 편집일 : 2023/06/29
설명 : TensorFlow로 Keras 모델을 사용하여, 멀티 GPU 트레이닝을 진행하는 가이드.
소개
일반적으로 여러 디바이스에 계산을 분산시키는 방법에는 두 가지가 있습니다:
- 데이터 병렬 처리
- 데이터 병렬 처리에서는 하나의 모델이 여러 장치나 여러 머신에 복제됩니다.
- 각 장치는 서로 다른 배치의 데이터를 처리한 후, 결과를 병합합니다.
- 이 설정에는 다양한 변형이 있으며, 서로 다른 모델 복제본이 결과를 병합하는 방식이나, 각 배치마다 동기화되는지 여부 등에 차이가 있습니다.
- 모델 병렬 처리
- 모델 병렬 처리에서는 하나의 모델의 다른 부분이 서로 다른 장치에서 실행되어, 하나의 데이터 배치를 함께 처리합니다.
- 이는 여러 가지 브랜치를 특징으로 하는 자연스럽게 병렬화된 아키텍처를 가진 모델에 가장 적합합니다.
이 가이드는 데이터 병렬 처리, 특히 동기식 데이터 병렬 처리에 중점을 둡니다. 여기서 모델의 서로 다른 복제본은 각 배치를 처리한 후 동기화됩니다. 동기화는 모델의 수렴 동작을 단일 장치에서의 트레이닝과 동일하게 유지시킵니다.
특히, 이 가이드는 TensorFlow의
tf.distribute
API를 사용하여,
여러 GPU(보통 2~16개)를 사용하는 동기식 데이터 병렬 처리 방식으로 Keras 모델을 트레이닝하는 방법을 다룹니다.
최소한의 코드 수정으로 여러 GPU가 설치된 단일 머신(단일 호스트, 멀티 디바이스 트레이닝)에서 트레이닝할 수 있습니다.
이는 연구자와 소규모 산업 워크플로우에서 가장 일반적으로 사용되는 설정입니다.
셋업
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import tensorflow as tf
import keras
단일 호스트, 다중 장치 동기 트레이닝
이 설정에서는, 여러 개의 GPU가 있는 하나의 머신(일반적으로 2~16개의 GPU)에서 트레이닝을 진행합니다. 각 디바이스는 **복제본(replica)**이라고 불리는 모델의 사본을 실행합니다. 간단히 설명하기 위해, 다음 내용에서는 8개의 GPU를 사용하는 것으로 가정하겠습니다. 이는 일반성을 잃지 않습니다.
작동 방식
트레이닝의 각 단계에서:
- 현재 데이터 배치(글로벌 배치)는 8개의 서로 다른 하위 배치(로컬 배치)로 나뉩니다. 예를 들어, 글로벌 배치에 512개의 샘플이 있으면, 8개의 로컬 배치 각각에는 64개의 샘플이 포함됩니다.
- 8개의 복제본 각각은 로컬 배치를 독립적으로 처리합니다: 순전파를 실행한 후 역전파를 수행하여 모델 손실에 대한 가중치의 그래디언트를 출력합니다.
- 로컬 그래디언트로부터 발생한 가중치 업데이트는 8개의 복제본 간에 효율적으로 병합됩니다. 이 병합은 각 스텝이 끝날 때 이루어지기 때문에, 복제본은 항상 동기화된 상태를 유지합니다.
실제로, 모델 복제본의 가중치를 동기화하는 과정은 각 개별 가중치 변수 레벨에서 처리됩니다. 이는 미러드 변수(mirrored variable) 객체를 통해 이루어집니다.
사용 방법
Keras 모델로 단일 호스트, 멀티 디바이스 동기 트레이닝을 수행하려면,
tf.distribute.MirroredStrategy
API를 사용하면 됩니다.
작동 방식은 다음과 같습니다:
MirroredStrategy
를 인스턴스화하고, 선택적으로 사용할 특정 디바이스를 구성할 수 있습니다. (기본적으로는 사용 가능한 모든 GPU를 사용합니다)- strategy 객체를 사용해 scope를 열고, 이 scope 내에서 변수를 포함하는 모든 Keras 객체를 생성합니다.
일반적으로, 모델 생성 및 컴파일은 분산 scope 내에서 이루어져야 합니다.
일부 경우에는,
fit()
호출 시에도 변수가 생성될 수 있으므로,fit()
호출도 scope 내에서 이루어지도록 하는 것이 좋습니다. fit()
을 통해 모델을 트레이닝합니다.
중요한 점으로, 멀티 디바이스 또는 분산 워크플로에서 데이터를 로드하려면,
tf.data.Dataset
객체를 사용하는 것을 권장합니다.
대략적인 흐름은 다음과 같습니다:
# MirroredStrategy 생성
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
# strategy scope 열기
with strategy.scope():
# 변수를 생성하는 모든 작업은 strategy scope 내에서 이루어져야 합니다.
# 일반적으로 모델 생성 및 `compile()`입니다.
model = Model(...)
model.compile(...)
# 사용 가능한 모든 디바이스에서 모델 트레이닝
model.fit(train_dataset, validation_data=val_dataset, ...)
# 사용 가능한 모든 디바이스에서 모델 평가
model.evaluate(test_dataset)
다음은 실행 가능한 간단한 엔드투엔드 예제입니다:
def get_compiled_model():
# 간단한 2 레이어 Dense 신경망을 만듭니다.
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(256, activation="relu")(inputs)
x = keras.layers.Dense(256, activation="relu")(x)
outputs = keras.layers.Dense(10)(x)
model = keras.Model(inputs, outputs)
model.compile(
optimizer=keras.optimizers.Adam(),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
return model
def get_dataset():
batch_size = 32
num_val_samples = 10000
# [`tf.data.Dataset`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset) 형식으로 MNIST 데이터셋을 반환합니다.
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# 데이터를 전처리합니다. (이들은 Numpy 배열입니다)
x_train = x_train.reshape(-1, 784).astype("float32") / 255
x_test = x_test.reshape(-1, 784).astype("float32") / 255
y_train = y_train.astype("float32")
y_test = y_test.astype("float32")
# num_val_samples 샘플을 검증용으로 예약합니다.
x_val = x_train[-num_val_samples:]
y_val = y_train[-num_val_samples:]
x_train = x_train[:-num_val_samples]
y_train = y_train[:-num_val_samples]
return (
tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size),
tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(batch_size),
tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size),
)
# MirroredStrategy를 생성합니다.
strategy = tf.distribute.MirroredStrategy()
print("Number of devices: {}".format(strategy.num_replicas_in_sync))
# strategy scope를 엽니다.
with strategy.scope():
# 변수를 생성하는 모든 작업은 strategy scope 내에서 이루어져야 합니다.
# 일반적으로 모델 생성 및 `compile()`입니다.
model = get_compiled_model()
# 사용 가능한 모든 디바이스에서 모델을 트레이닝합니다.
train_dataset, val_dataset, test_dataset = get_dataset()
model.fit(train_dataset, epochs=2, validation_data=val_dataset)
# 사용 가능한 모든 디바이스에서 모델을 테스트합니다.
model.evaluate(test_dataset)
결과
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Number of devices: 1
Epoch 1/2
1563/1563 ━━━━━━━━━━━━━━━━━━━━ 7s 4ms/step - loss: 0.3830 - sparse_categorical_accuracy: 0.8884 - val_loss: 0.1361 - val_sparse_categorical_accuracy: 0.9574
Epoch 2/2
1563/1563 ━━━━━━━━━━━━━━━━━━━━ 9s 3ms/step - loss: 0.1068 - sparse_categorical_accuracy: 0.9671 - val_loss: 0.0894 - val_sparse_categorical_accuracy: 0.9724
313/313 ━━━━━━━━━━━━━━━━━━━━ 1s 2ms/step - loss: 0.0988 - sparse_categorical_accuracy: 0.9673
콜백을 사용하여 장애 복원력(fault tolerance) 보장하기
분산 트레이닝을 사용할 때, 항상 장애 복원력을 위한 전략을 세워야 합니다.
가장 간단한 방법은 ModelCheckpoint
콜백을 fit()
에 전달하여,
일정 간격마다 모델을 저장하는 것입니다. (예: 매 100 배치마다 또는 매 에포크마다)
이렇게 하면, 저장된 모델에서 트레이닝을 재시작할 수 있습니다.
다음은 간단한 예시입니다:
# 체크포인트를 저장할 디렉토리를 준비합니다.
checkpoint_dir = "./ckpt"
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
def make_or_restore_model():
# 체크포인트가 있으면 최신 모델을 복원하고,
# 체크포인트가 없으면 새로운 모델을 생성합니다.
checkpoints = [checkpoint_dir + "/" + name for name in os.listdir(checkpoint_dir)]
if checkpoints:
latest_checkpoint = max(checkpoints, key=os.path.getctime)
print("Restoring from", latest_checkpoint)
return keras.models.load_model(latest_checkpoint)
print("Creating a new model")
return get_compiled_model()
def run_training(epochs=1):
# MirroredStrategy를 생성합니다.
strategy = tf.distribute.MirroredStrategy()
# strategy scope를 열고 모델을 생성하거나 복원합니다.
with strategy.scope():
model = make_or_restore_model()
callbacks = [
# 이 콜백은 매 에포크마다 SavedModel을 저장합니다.
# 현재 에포크를 폴더 이름에 포함시킵니다.
keras.callbacks.ModelCheckpoint(
filepath=checkpoint_dir + "/ckpt-{epoch}.keras",
save_freq="epoch",
)
]
model.fit(
train_dataset,
epochs=epochs,
callbacks=callbacks,
validation_data=val_dataset,
verbose=2,
)
# 처음 실행 시 모델을 생성합니다.
run_training(epochs=1)
# 같은 함수를 다시 호출하면 이전 상태에서 재개합니다.
run_training(epochs=1)
결과
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Creating a new model
1563/1563 - 7s - 4ms/step - loss: 0.2275 - sparse_categorical_accuracy: 0.9320 - val_loss: 0.1373 - val_sparse_categorical_accuracy: 0.9571
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Restoring from ./ckpt/ckpt-1.keras
1563/1563 - 6s - 4ms/step - loss: 0.0944 - sparse_categorical_accuracy: 0.9717 - val_loss: 0.0972 - val_sparse_categorical_accuracy: 0.9710
tf.data
성능 팁
분산 트레이닝을 수행할 때, 데이터를 로드하는 효율성이 매우 중요해질 수 있습니다.
tf.data
파이프라인을 가능한 한 빠르게 실행하는 몇 가지 팁을 소개합니다.
데이터셋 배치에 대한 주의사항
데이터셋을 생성할 때, 글로벌 배치 크기로 배치되었는지 확인하세요. 예를 들어, 8개의 GPU 각각이 64개의 샘플로 구성된 배치를 실행할 수 있는 경우, 글로벌 배치 크기는 512로 설정합니다.
dataset.cache()
호출
데이터셋에서 .cache()
를 호출하면, 첫 번째 반복 이후 데이터가 캐시됩니다.
이후 모든 반복에서는 캐시된 데이터를 사용하게 됩니다.
캐시는 기본적으로 메모리에 저장되며, 또는 사용자가 지정한 로컬 파일에 저장할 수 있습니다.
이 방법은 다음과 같은 경우, 성능을 향상시킬 수 있습니다:
- 데이터가 반복마다 변경되지 않는 경우
- 데이터를 원격 분산 파일 시스템에서 읽어오는 경우
- 데이터를 로컬 디스크에서 읽어오고, 데이터가 메모리에 fit하며, 워크플로우가 주로 IO 바운드인 경우 (예: 이미지 파일 읽기 및 디코딩)
dataset.prefetch(buffer_size)
호출
데이터셋을 생성한 후에는 거의 항상 .prefetch(buffer_size)
를 호출하는 것이 좋습니다.
이 방법은 데이터 파이프라인이 모델과 비동기적으로 실행되도록 하여,
현재 배치가 트레이닝되는 동안 다음 배치의 샘플을 미리 처리하고 버퍼에 저장합니다.
현재 배치가 완료될 때쯤이면 다음 배치가 GPU 메모리로 미리 로드됩니다.
이것이 전부입니다!