Source code for data.smiles_enumerator

# Experimental Class for Smiles Enumeration, Iterator and SmilesIterator
# adapted from Keras 1.2.2
from rdkit import Chem
import numpy as np
import threading


[docs]class Iterator(object): """Abstract base class for data iterators. # Arguments n: Integer, total number of samples in the dataset to loop over. batch_size: Integer, size of a batch. shuffle: Boolean, whether to shuffle the data between epochs. seed: Random seeding for data shuffling. """ def __init__(self, n, batch_size, shuffle, seed): self.n = n self.batch_size = batch_size self.shuffle = shuffle self.batch_index = 0 self.total_batches_seen = 0 self.lock = threading.Lock() self.index_generator = self._flow_index(n, batch_size, shuffle, seed) if n < batch_size: raise ValueError('Input data length is shorter than batch_size' 'Adjust batch_size')
[docs] def reset(self): self.batch_index = 0
def _flow_index(self, n, batch_size=32, shuffle=False, seed=None): # Ensure self.batch_index is 0. self.reset() while 1: if seed is not None: np.random.seed(seed + self.total_batches_seen) if self.batch_index == 0: index_array = np.arange(n) if shuffle: index_array = np.random.permutation(n) current_index = (self.batch_index * batch_size) % n if n > current_index + batch_size: current_batch_size = batch_size self.batch_index += 1 else: current_batch_size = n - current_index self.batch_index = 0 self.total_batches_seen += 1 yield (index_array[current_index:current_index + current_batch_size], current_index, current_batch_size) def __iter__(self): # Needed if we want to do something like: # for x, y in data_gen.flow(...): return self def __next__(self, *args, **kwargs): return self.next(*args, **kwargs)
[docs]class SmilesIterator(Iterator): """Iterator yielding data from a SMILES array. # Arguments x: Numpy array of SMILES input data. y: Numpy array of targets data. smiles_data_generator: Instance of `SmilesEnumerator` to use for random SMILES generation. batch_size: Integer, size of a batch. shuffle: Boolean, whether to shuffle the data between epochs. seed: Random seed for data shuffling. dtype: dtype to use for returned batch. Set to keras.backend.floatx if using Keras """ def __init__(self, x, y, smiles_data_generator, batch_size=32, shuffle=False, seed=None, dtype=np.float32): if y is not None and len(x) != len(y): raise ValueError('X (images tensor) and y (labels) ' 'should have the same length. ' 'Found: X.shape = %s, y.shape = %s' % (np.asarray(x).shape, np.asarray(y).shape)) self.x = np.asarray(x) if y is not None: self.y = np.asarray(y) else: self.y = None self.smiles_data_generator = smiles_data_generator self.dtype = dtype super(SmilesIterator, self).__init__(x.shape[0], batch_size, shuffle, seed)
[docs] def next(self): """For python 2.x. # Returns The next batch. """ # Keeps under lock only the mechanism which advances # the indexing of each batch. with self.lock: index_array, current_index, current_batch_size =\ next(self.index_generator) # The transformation of images is not under thread lock # so it can be done in parallel batch_x = np.zeros(tuple([current_batch_size] + [self.smiles_data_generator.pad, self.smiles_data_generator._charlen]), dtype=self.dtype) for i, j in enumerate(index_array): smiles = self.x[j:j + 1] x = self.smiles_data_generator.transform(smiles) batch_x[i] = x if self.y is None: return batch_x batch_y = self.y[index_array] return batch_x, batch_y
[docs]class SmilesEnumerator(object): """SMILES Enumerator, vectorizer and devectorizer #Arguments charset: string containing the characters for the vectorization can also be generated via the .fit() method pad: Length of the vectorization leftpad: Add spaces to the left of the SMILES isomericSmiles: Generate SMILES containing information about stereogenic centers enum: Enumerate the SMILES during transform canonical: use canonical SMILES during transform (overrides enum) """ def __init__(self, charset='@C)(=cOn1S2/H[N]\\', pad=120, leftpad=True, isomericSmiles=True, enum=True, canonical=False): self._charset = None self.charset = charset self.pad = pad self.leftpad = leftpad self.isomericSmiles = isomericSmiles self.enumerate = enum self.canonical = canonical @property def charset(self): return self._charset @charset.setter def charset(self, charset): self._charset = charset self._charlen = len(charset) self._char_to_int = dict((c, i) for i, c in enumerate(charset)) self._int_to_char = dict((i, c) for i, c in enumerate(charset))
[docs] def fit(self, smiles, extra_chars=[], extra_pad=5): """Performs extraction of the charset and length of a SMILES datasets and sets self.pad and self.charset #Arguments smiles: Numpy array or Pandas series containing smiles as strings extra_chars: List of extra chars to add to the charset (e.g. "\\\\" when "/" is present) extra_pad: Extra padding to add before or after the SMILES vectorization """ charset = set("".join(list(smiles))) self.charset = "".join(charset.union(set(extra_chars))) self.pad = max([len(smile) for smile in smiles]) + extra_pad
[docs] def randomize_smiles(self, smiles): """Perform a randomization of a SMILES string must be RDKit sanitizable""" m = Chem.MolFromSmiles(smiles) ans = list(range(m.GetNumAtoms())) np.random.shuffle(ans) nm = Chem.RenumberAtoms(m, ans) return Chem.MolToSmiles(nm, canonical=self.canonical, isomericSmiles=self.isomericSmiles)
[docs] def transform(self, smiles): """Perform an enumeration (randomization) and vectorization of a Numpy array of smiles strings #Arguments smiles: Numpy array or Pandas series containing smiles as strings """ one_hot = np.zeros((smiles.shape[0], self.pad, self._charlen), dtype=np.int8) for i, ss in enumerate(smiles): if self.enumerate: ss = self.randomize_smiles(ss) for j, c in enumerate(ss): one_hot[i, j, self._char_to_int[c]] = 1 return one_hot
[docs] def reverse_transform(self, vect): """ Performs a conversion of a vectorized SMILES to a smiles strings charset must be the same as used for vectorization. #Arguments vect: Numpy array of vectorized SMILES. """ smiles = [] for v in vect: # mask v v = v[v.sum(axis=1) == 1] # Find one hot encoded index with argmax, translate to char # and join to string smile = "".join(self._int_to_char[i] for i in v.argmax(axis=1)) smiles.append(smile) return np.array(smiles)
if __name__ == "__main__": smiles = np.array( ["CCC(=O)O[C@@]1(CC[NH+](C[C@H]1CC=C)C)c2ccccc2", "CCC[S@@](=O)c1ccc2c(c1)[nH]/c(=N/C(=O)OC)/[nH]2"] * 10) # Test canonical SMILES vectorization sm_en = SmilesEnumerator(canonical=True, enum=False) sm_en.fit(smiles, extra_chars=["\\"]) v = sm_en.transform(smiles) transformed = sm_en.reverse_transform(v) if len(set(transformed)) > 2: print("Too many different canonical SMILES generated") # Test enumeration sm_en.canonical = False sm_en.enumerate = True v2 = sm_en.transform(smiles) transformed = sm_en.reverse_transform(v2) if len(set(transformed)) < 3: print("Too few enumerated SMILES generated") # Reconstruction reconstructed = sm_en.reverse_transform(v[0:5]) for i, smile in enumerate(reconstructed): if smile != smiles[i]: print("Error in reconstruction %s %s" % (smile, smiles[i])) break # test Pandas import pandas as pd df = pd.DataFrame(smiles) v = sm_en.transform(df[0]) if v.shape != (20, 52, 18): print("Possible error in pandas use") # BUG, when batchsize > x.shape[0], then it only returns x.shape[0]! # Test batch generation sm_it = SmilesIterator(smiles, np.array([1, 2] * 10), sm_en, batch_size=10, shuffle=True) X, y = sm_it.next() if sum(y == 1) - sum(y == 2) > 1: print("Unbalanced generation of batches") if len(X) != 10: print("Error in batchsize generation")