Create user minibatch sources

In order to make use of CNTK’s (distributed) training functionality, one has to provide input data as an instance of MinibatchSource. In CNTK, there are a variety of means to provide minibatch sources:

  • (best) convert data to the formats of built-in data readers - they support rich functionality of randomization/packing with high performance (see How to feed data and cntk.io)
  • (preferred) if it is hard to convert the data and the data can fit in memory, please use MinibatchSourceFromData,
  • if the data does not fit in memory and you want a fine grained control over how minibatch is created, then implementing the abstract UserMinibatchSource interface is the option.

This manual explains the last approach: How to create user minibatch source in Python.

User minibatch sources

A minibatch source is responsible for providing:

  1. meta-information regarding the data, such as storage format, data type, shape of elements,
  2. batches of data, and
  3. auxiliary information for advanced features, such as checkpoint state of the current data access position so that interrupted learning processes can be restored from the data position where the processes were interrupted.

Correspondingly, a minibatch source API needs to implement the following \(4\) methods (see UserMinibatchSource for details):

  1. stream_infos(): Returns a list of StreamInformation instances. Each piece of stream information contains the meta information regarding a stream of the data: e.g. storage format, data type, shape of elements (see StreamInformation for details)
  2. next_minibatch(num_samples, number_of_workers, worker_rank, device=None): Returns next minibatch of data of the specified nature as specified by given parameters:
    • num_samples: the number of samples that are being requested
    • num_of_workders: the number of workers in a distributed training session; if it is not in a distributed training setting, this number is always 1
    • worker_rank: the number which identifies the specific worker who requests this minibatch in the distributed training setting; if this is not a distributed training session, the worker rank is always 0 (the first worker)
    • device: a device descriptor specifying which device the minibatch data should be copied to, e.g. cntk.device.cpu() or cntk.device.gpu(device_id) (see DeviceDescriptor for details)
  3. get_checkpoint_state(): Returns a dictionary which describe the current state of the minibatch source
  4. restore_from_checkpoint(state): Sets the state of the minibatch source according to a checkint state object. This allows a minibatch source restoring data feeding from position where the checkpoint was saved. Note that state is the dictionary returned by get_checkpoint_state().

Now let’s go through th implementation of these \(4\) methods step by step.

User minibatch source step by step

In the following example, we will detail the steps on how to implement the UserMinibatchSource interface.

First, let’s import the necessary packages:

In [1]:
import numpy as np
import cntk as C
from cntk.io import UserMinibatchSource, StreamInformation, MinibatchData

Secondly, let’s assume that we have a data set in the following tab seperated text format: * The first column is the sequence ID: e.g. 0 is the ID for sequecne 0; and 1 is the ID for sequence 1. * The second column starts with symbol “|”. It is the feature named ‘x’ which is a sparse representation for the words in our training data. * The third column again starts with symbol “|”. It is our lable named ‘y’ which is the one-hot representation of label.

In the fllowing, our toy data set contains 4 sequences:

In [2]:
sample_data = r'''0      |x 560:1        |y 1 0 0 0 0
0   |x 0:1
0   |x 0:1
1   |x 560:1        |y 0 1 0 0 0
1   |x 0:1
1   |x 0:1
1   |x 424:1
2   |x 160:1        |y 0 0 1 0 0
2   |x 5:1
2   |x 6:1
3   |x 460:1        |y 0 0 0 1 0
3   |x 3:1
3   |x 3:1
3   |x 425:1
'''

Inherit UserMinibatchSource to create your user minibatch class:

To implement our example user minibatch source, we first prepare the data access and its meta information:

  1. Parse the text formatted data into an intermediate representation so that we can access the data by their sequence indices:

    features = self.data[seq_idx]['features']
    labels = self.data[seq_idx]['labels']
    

    This is done by create a private method _prepare_data() in the example below. We ommit the implementation detail of text format parsing here as the detail is irrelevant to the understanding of the UserMinibatchSource interface. However, the parsing mechanims should be able to keep track of where the current data access point is so that the data feeding process can be restored at any point. In the example, we are tracking the sequence index.

  2. Define the meta information of the data: e.g.

    self.fsi = StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))
    self.lsi = StreamInformation("labels", 1, 'dense', np.float32, (self.l_dim,))
    

    The self.fsi and self.lsi define the meta information (see StreamInformation for definition ) regarding the features and labels respectively. For example, StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,)) specifies that :

    1. the “feature” data stream is indentified by ID \(0\) (it is required that every data stream is identified by a unique ID),
    2. it is sparse,
    3. its data type is np.float32, and
    4. its dimension is (self.f_dim, ).
  3. Set the initial states of the data source. For example, set the next sequence index to the beginning:

    self.next_seq_idx = 0
    
  4. Finally, create your minibatch class based on UserMinibatchSource and put the above data access preparation steps in its constructor:

    class MyMultiWorkerDataSource(UserMinibatchSource):
    def __init__(self, f_dim, l_dim):
        self.f_dim, self.l_dim = f_dim, l_dim
        self._prepare_data()
        #setting the state
        self.fsi = StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))
        self.lsi = StreamInformation("labels", 1, 'dense', np.float32, (self.l_dim,))
        self.sequences = sorted(self.data)
        self.next_seq_idx = 0
        super(MyMultiWorkerDataSource, self).__init__()
    

    Do not forget to call the super class’ constructor: super(MyMultiWorkerDataSource, self) init() function.

Override stream_infos() method to provide meta-informatoin of data:

After the preparation is done by the constructor, we can implement stream_infos() simply by returning the list of stream information instances:

def stream_infos(self):
    return [self.fsi, self.lsi]

With this method implemented, the underlying minibatch source framework will able to refer to the meta information by names “featuers” and “labels”.

Override next_minibatch() method to provide data

Let us first review the function signature of the next_minibatch method:

def next_minibatch(self, num_samples, number_of_workers, worker_rank, device)

This method is invoked by the outer CNTK learning loops with four parameters: * the nubmer of samples needed, * number of workers, * worker rank (i.e. worker ID), and * the device on which the data should be copied to.

In other words, it is the user minibatch source’s responsibility to understand these parameters and provide minibatch data accordingly. The minibatch source need to ensure that * the returned minibatch contains the specified number of samples or less, * the returned minbiatch contains only the data that are supposed to be assigned to the specified worker (identified by the worker_rank) - it is the user minibanch’s responsisbility to ensure that the data load of these workers are balanced in certain manner, and * the data are ready in the specified device (e.g. CPU or GPU).

To make the underlying requirement stand out, in the example below we implemented a private function *_prepare_nextbatch()* to encapsulate details:

def _prepare_nextbatch(self, num_samples, number_of_workers, worker_rank):
    # details....
    return features, f_sample_count, labels, l_sample_count, sweep_end

This function ensure that features and labels contains the num_samples of samples or less. The sample counts are also returned as f_sample_count and l_sample_count respectively. Note that different data streams might contain different number of samples. In addition, sweep_end tells whether this minibatch is at the end of a sweep of the whole data set.

To define user minibatch source that can be used with distributed learners, e.g. BlockMomentum. We will need to use number_of_workers to cut the data into slices and then return the slices depending on which worker_rank requested the next minibatch. In this private function, we implement a naive logic to distribute the data to the specific worker by skipping sequences if its sequence index modulo the number of workers does not equal to the worker rank:

if (seq_id % number_of_workers) != worker_rank:
    continue

This is only for demonstration purpose. In practice, the distribution of data to workers should be based on a more efficient mechanism: e.g. based on how costly the specific worker can access the specific subset of data and the randomization mechanism.

After the data is prepared, we need to convert them into the values that CNTK operators can operate on efficiently. This is done by create various types of cntk.Value instances:

feature_data = C.Value.one_hot(batch=features, num_classes=self.f_dim, device = device)
label_data = C.Value(batch=np.asarray(labels, dtype=np.float32), device = device)

In this example, the feature data are of a special type of sparse data which are created through the cntk.Value.one_hot function — an element within a sequence is a one-hot vector. The label data are of a type of dense data which are created through the cntk.Value constructor. Note that in these CNTK value constructors, we explicitly specify on which device these values should be constructed. Reall that the device parameter is provided by the outher learning loops.

Finally, we need to create MinibatchData instances and return them in a dictionary with the corresponding StreamInformation instances as keys:

res = {
    self.fsi: MinibatchData(feature_data, num_seq, feature_sample_count, sweep_end),
    self.lsi: MinibatchData(label_data, num_seq, label_sample_count, sweep_end)}
return res

The constructor of MinibatchData takes 1) the data that are already in the form cntk.Value: i.e. feature_data and label_data here, 2) the number of sequences in the minibatch, 3) the number of samples, and 4) whether it is at the end of a sweep of the whole data set.

All together, we’ve implemented our next_minibatch() method to provide minibatches of data of specified properties for the outer learning loops to consume.

Override get_checkpoint_state() and restore_from_checkpoint() methods to provide checkpoint state and restore from it

Firstly, we need to define the state of our user minibatch so that the data feeding process can be restored from the exact point where it was stopped. In our simple example, we just need to know to next sequence index to restore the data feeding process by the following get and restore checkpoints methods:

def get_checkpoint_state(self):
    return {'next_seq_idx': self.next_seq_idx}
def restore_from_checkpoint(self, state):
    self.next_seq_idx = state['next_seq_idx']

It is easy to see that a checkpoint state is a dictionary from string keys to the corresponding state variable value objects. In this example, it is the next sequence index.

The complete user minibatch example

All together we have our complete user minibatch implementation as follows:

In [3]:
class MyMultiWorkerDataSource(UserMinibatchSource):
    def __init__(self, f_dim, l_dim):
        self.f_dim, self.l_dim = f_dim, l_dim
        self._prepare_data()
        #setting the state
        self.fsi = StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))
        self.lsi = StreamInformation("labels", 1, 'dense', np.float32, (self.l_dim,))
        self.sequences = sorted(self.data)
        self.next_seq_idx = 0
        super(MyMultiWorkerDataSource, self).__init__()

    def _prepare_data(self):
        """
        Parse the text and load the data into self.data.
        self.data is of the following structure:
           sequence id -> "features" -> list of features
        and
          sequence id -> "labels" -> label
        """
        self.data = {}
        for line in sample_data.split('\n'):
            line = line.strip()
            if not line:
                continue
            seq_id, data = line.split('|', 1)
            data = data.split("|")
            seq_id = int(seq_id.strip())

            if seq_id not in self.data:
                self.data[seq_id] = {'features': []}

            # Processing features - expecting one per line.
            features = data[0].split(" ")
            vocab_idx = int(features[1].split(":")[0])
            self.data[seq_id]['features'].append(vocab_idx)

            # Process label, if exists
            if len(data) == 2:
                labels = np.asarray([data[1].split(" ")[1:]], dtype=np.float32)
                self.data[seq_id]['labels'] = labels

    def _prepare_nextbatch(self, num_samples, number_of_workers, worker_rank):
        features = []
        labels = []
        sweep_end = False
        f_sample_count = l_sample_count = 0

        while max(f_sample_count, l_sample_count) < num_samples:
            if self.next_seq_idx == len(self.sequences):
                sweep_end = True
                self.next_seq_idx = 0

            seq_id = self.sequences[self.sequences[self.next_seq_idx]]
            #Based on the worker rank, determines whether to add this
            #data in the batch: If the sequences doesn't belong to this
            #worker, skip it. In practice, this should be based on more
            #efficient mechanism: e.g. based on the location of the worker
            #and the data location
            if (seq_id % number_of_workers) != worker_rank:
                continue
            feature_data = self.data[seq_id]['features']
            label_data = self.data[seq_id]['labels']
            if (features or labels) and \
                max(f_sample_count+len(feature_data), \
                    l_sample_count+len(label_data)) > num_samples:
                break
            f_sample_count += len(feature_data)
            features.append(feature_data)

            l_sample_count += len(label_data)
            labels.append(label_data)

            self.next_seq_idx += 1
        return features, f_sample_count, labels, l_sample_count, sweep_end

    def stream_infos(self):
        """
        Override the stream_infos method of the base UserMinibatchSource class
        to provide stream meta information.
        """
        return [self.fsi, self.lsi]

    def next_minibatch(self, num_samples, number_of_workers, worker_rank, device):
        """
        Override the next_minibatch method of the base UserMinibatchSource class
        to provide minibatch data.
        """
        features, feature_sample_count, \
        labels, label_sample_count, sweep_end = self._prepare_nextbatch(num_samples,
                                                                        number_of_workers,
                                                                        worker_rank)
        feature_data = C.Value.one_hot(batch=features, num_classes=self.f_dim, device = device)
        label_data = C.Value(batch=np.asarray(labels, dtype=np.float32), device = device)
        num_seq = len(features)
        res = {
                self.fsi: MinibatchData(feature_data, num_seq, feature_sample_count, sweep_end),
                self.lsi: MinibatchData(label_data, num_seq, label_sample_count, sweep_end)
                }
        return res

    def get_checkpoint_state(self):
        return {'next_seq_idx': self.next_seq_idx}

    def restore_from_checkpoint(self, state):
        self.next_seq_idx = state['next_seq_idx']

Note that in this example, for simplicity we load the whole data set into the memory. In practice, the minibatch source should depend on the data source state (e.g. the mapping between the requesting next batch data and its logical/physical location in the data storage) to load (or pre-load) the data at the point (or right before) they are requested.

Using the user minibatch data source in training sessions with distributed learners

The implemented minitbatch source can then be used wherever a MinibatchSource instance is accepted. For example,

In [4]:
input_dim = 1000
num_output_classes = 5

# instantiating the user minibatch source
mbs = MyMultiWorkerDataSource( input_dim, num_output_classes)
feature = C.sequence.input_variable(shape=(input_dim,))
label = C.input_variable(shape=(num_output_classes,))

# setting up the model
rnn = C.layers.Recurrence(C.layers.LSTM(20), go_backwards=False)(feature)
end = C.sequence.last(rnn)
z = C.layers.Dense(num_output_classes)(end)
loss = C.cross_entropy_with_softmax(z, label)
errs = C.classification_error(z, label)
local_learner = C.sgd(z.parameters,
                      C.learning_parameter_schedule_per_sample(0.5))
dist_learner = C.distributed.data_parallel_distributed_learner(local_learner)
# and train
trainer = C.Trainer(z, (loss, errs),
                    [dist_learner],
                    [C.logging.ProgressPrinter(tag='Training', num_epochs=10)])
input_map = {
    feature: mbs.fsi,
    label: mbs.lsi
}
session = C.training_session(
    trainer = trainer,
    mb_source = mbs,
    model_inputs_to_streams = input_map,
    mb_size = 7,
    max_samples = 80,
    progress_frequency = 20
)
session.train()
#finalize the distributed learning
C.distributed.Communicator.finalize()
Finished Epoch[1 of 10]: [Training] loss = 1.564692 * 20, metric = 90.00% * 20 0.600s ( 33.3 samples/s);
Finished Epoch[2 of 10]: [Training] loss = 1.035505 * 20, metric = 5.00% * 20 0.045s (444.4 samples/s);
Finished Epoch[3 of 10]: [Training] loss = 0.364257 * 20, metric = 0.00% * 20 0.043s (465.1 samples/s);
Finished Epoch[4 of 10]: [Training] loss = 0.133813 * 20, metric = 0.00% * 20 0.052s (384.6 samples/s);

User minibatch sources in restricted scenarios

In certain simplified scenarios, we might not want to implement a minibatch source with full functionality.

  • If parallel data learning is not reqired, we can omit the logic of distributing data to workers. Set number_of_workers = 1 and worker_rank = 0 when overriding the next_minibatch() method.
  • If checkpoint restoration is not require, we can omit implementing the two checkpoint related methods: get_checkpoint_state() and restore_from_checkpoint().