Source code for BasicTools.TensorTools.Formats

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#


import numpy as np

from BasicTools.Helpers.TextFormatHelper import TFormat
# name comp is reserved for the u_comp = [u_1 u_2 u_3] = [u v w]

[docs]class BaseTensor(object): def __init__(self): self.cores = [];
[docs] def GetAxisNames(self): res = []; for c in self.cores: res.extend(c.GetAxisNames()) return list(set(res))
[docs] def GetNumberOfDims(self): return len(self.GetAxisNames())
[docs] def AddSubTensor(self,Gs): import copy self.cores.append(copy.deepcopy(Gs))
[docs] def Restriction(self,part={}): """ Particularisation of the canonic Tensor for some dimension , part is a dictionary with the names and the index to particularize""" " part = {'x':[1,3,4], 'comp':[0]}" import math import copy as cp res = cp.deepcopy(self) for c in res.cores: index = [] for n,cpt in zip(c.names,list(range(c.ndims))): if list(part.keys()).count(n) : #c.array = c.array.slice() index.append(slice(part[n],int(part[n]+math.copysign(1,part[n])), int(math.copysign(1,part[n])) ) ) else: index.append(slice(None)) c.array = c.array[tuple(index)]; return res
def __str__(self): res = str(type(self)).split(".")[-1].split("'")[0] + "\n" TFormat().II(); CI = getattr(self, "CheckIntegrity", None) if CI != None: if self.CheckIntegrity(verbose=False): res +=TFormat().GetIndent() + 'Integrity OK \n'; else: res += TFormat.InRed(TFormat().GetIndent() +'Integrity NOT OK !!!')+'\n'; res += TFormat().GetIndent() + "GetAxisNames : " + str(", ".join(self.GetAxisNames())) cpt = 0 for c in self.cores: res += "\n"+ TFormat().GetIndent() +str(cpt) +") " TFormat.II(); res += c.__str__() TFormat.DI(); cpt +=1 TFormat.DI(); return res
[docs]class FullTensor(BaseTensor): """ a numpy darray with names """ def __init__(self, nRanks=2 , data = None): super(type(self),self).__init__() self.ndims = nRanks; if data is None: self.array = np.ndarray(shape=(1,)*nRanks,dtype=float); else: self.array = data; self.names = ['']*nRanks;
[docs] def SetRanksNames(self, names): assert(len(names) == self.ndims) self.names = names
[docs] def GetAxisNames(self): return self.names;
def __str__(self): res = 'FullTensor : ' if len(self.names): res += "" + ", ".join(self.names) res += " ("+ (",".join([str(x) for x in self.array.shape]) )+")" return res
[docs]class SparseBaseTensor(BaseTensor): def __init__(self): super(SparseBaseTensor,self).__init__()
[docs] def CheckIntegrity(self,verbose=True): # first we get the list of all the Spaces sizes = {}; # now for each SubTensor we check the size of the ranks res = True; for c in self.cores: for n,s in zip(c.names,c.array.shape): if n in sizes: if (sizes[n] != s) : res = False; if verbose: print("Tensor Integrity check fail! : dimension '" + n + "' is inconsistent") else: sizes[n] = s if res and verbose : print ("Tensor Integrity ok") return res
[docs] def Reduction(self,redRanks=[]): import numpy as np # first we check if the reduction will generate a full tensor # we never do the reduction in the comp # for the moment we dont know how to treat the comp dic = 'abcdefghijklmnopqrstuvwxyz' # generation of the dictionary spaces = self.GetAxisNames() #print(spaces) dictionary = dict(list(zip(spaces, dic[0:len(spaces)]))) # to count the number of singleDouble = {}; script = '' for G in self.cores: script += "".join([dictionary[x] for x in G.names ]) script += "," for y in G.names: if y in singleDouble: singleDouble[y] += 1 else: singleDouble[y] = 1 #print([x for x in singleDouble if singleDouble[x] == 1]) script = script[0:-1] + "->" # Generation of the operation # the independent index are concatenated in the same order #print('-----') #print(singleDouble) fullDim = 0; names = [] for G in self.cores: for y in G.names: if singleDouble[y] == 1: script += dictionary[y] fullDim +=1; singleDouble[y] = 0 names.append(y); print(script) data = np.einsum(script, *[ G.array for G in self.cores]); res = FullTensor(nRanks = fullDim) res.array = data res.SetRanksNames(names); return res
[docs]class CanonicTensor(SparseBaseTensor): def __init__(self): super(CanonicTensor,self).__init__()
[docs] def AddSubTensor(self,data, name=None): if name is None: super(CanonicTensor, self).AddSubTensor(data) return G1 = FullTensor(nRanks=2) G1.SetRanksNames([name,'alpha1']) G1.array = data; super(CanonicTensor, self).AddSubTensor(G1)
def __add__(self, other): if type(self) != type(other): return TensorSum(self,other) else: import copy res = copy.copy(self) if self is other : res *= 2 return res else: res += other return res def __iadd__(self,other): if type(self) != type(other): raise Exception("can't do an inplace operation with different types of objects")# pragma: no cover else: if self is other : return self*2 else: for G1,G2 in zip(self.cores,other.cores): x = np.concatenate((G1.array, G2.array), axis=1) G1.array = x return self def __mul__(self,other): if type(other) == float or type(other) == int: import copy res = copy.copy(self) res *= other return res else : raise Exception("Don't know how to multiply a tensor and a " + str(type(other))) def __imul__(self,other): if type(other) == float or type(other) == int: self.cores[0].array *= other return self else: raise Exception("Don't know how to multiply a tensor and a " + str(type(other)) ) # pragma: no cover
[docs]class TensorTrain(SparseBaseTensor): def __init__(self): super(type(self),self).__init__()
[docs] def AddSubTensor(self,data, name=None): if name is None: super(TensorTrain, self).AddSubTensor(data) return #if empty if len(self.cores) == 0 and len(data.shape) != 2: # pragma: no cover raise Exception("The first core must be of dimensionality 2") if len(self.cores) > 1 and self.cores[-1].GetNumberOfDims() == 2 :# pragma: no cover raise Exception("Can't add a new core (last core is only of rank 2)") if len(self.cores) == 0 : G1 = FullTensor(nRanks=2) G1.SetRanksNames([name,'alpha1']) G1.array = data; super(TensorTrain, self).AddSubTensor(G1) return if len(self.cores) > 0 : G1 = FullTensor(nRanks=len(data.shape)) names = [name] if len(data.shape) == 3: names.append('alpha'+str(len(self.cores)+1)) names.append(self.cores[-1].names[1]) print( names) G1.SetRanksNames(names) G1.array = data; super(TensorTrain, self).AddSubTensor(G1) return
# for the moment we use the reduction from the base class, slow but robust # def Reduction(self,redRanks=[]): # #for now only works fo # mat = [ G.array for G in self.cores] # mat.reverse() # print([ x.shape for x in mat]) # data = reduce(np.dot,mat) # res = FullTensor(nRanks = len(self.cores)) # res.array = data # res.SetRanksNames([X.names[0] for X in self.cores]); # return res
[docs]class TensorSum(BaseTensor): def __init__(self,first,second): super(type(self),self).__init__() if issubclass(type(first), SparseBaseTensor): self.AddSubTensor(first) else: raise Exception("TensorSum: First Object not a SparseBaseTensor class") # pragma: no cover if issubclass(type(second), SparseBaseTensor): self.AddSubTensor(second) else: raise Exception("TensorSum: Second Object not a SparseBaseTensor class") # pragma: no cover
[docs] def GetAxisNames(self): res = []; for c in self.cores: res.extend(c.GetAxisNames()) return list(set(res))
def __add__(self,other): import copy res = copy.deepcopy(self) res += other return res def __iadd__(self,other): if type(other) == TensorSum: self.cores.extend(other.cores) return self elif issubclass(type(other), SparseBaseTensor): self.AddSubTensor(other) return self else: raise Exception("TensorSum.__iadd__: Don't know how add type " + str(type(other)))
[docs]def CheckIntegrity(): import numpy as np # Test Full tensors ft = FullTensor(data=np.ndarray((2,3))); name = ft.GetAxisNames(); disp = ft.__str__() # Test SparseBaseTensor CheckIntegrity spt = SparseBaseTensor(); for n in [0,1]: G = FullTensor(nRanks=2); G.SetRanksNames(["X"+str(n) ,'alpha1']) G.array = np.ndarray((2,3+n)) spt.AddSubTensor(G) spt.CheckIntegrity() print(spt) spt.cores[1].array = np.ndarray((2,3)) spt.Restriction({'X1':int(0)}) spt.Reduction() # Test SparseBaseTensor CheckIntegrity ct = CanonicTensor() ct.AddSubTensor(spt.cores[0]) ct.AddSubTensor(spt.cores[1].array,'X1') ct2 = CanonicTensor() ct2.AddSubTensor(spt.cores[0]) ct2.AddSubTensor(spt.cores[1].array,'X1') # sum ct = ct + ct + ct2 ct += ct # product ct = ct *2 ct *= 2 #TODO try : ct = ct * ct return 'Not ok' # pragma: no cover except Exception as e: pass #TODO try : ct = ct * ct2 return 'Not ok' # pragma: no cover except Exception as e: pass # heterogenius tt = TensorTrain() print("1") print(tt) tt.AddSubTensor(spt.cores[0].array,'x0') print("2") print(tt) tt.AddSubTensor(np.empty([1, 1, 3]) ,'X1') print("3") print(tt) tt.AddSubTensor(np.empty([1, 1]) ,'X2') print("4") print(tt) su = ct + tt print(su) su += ct2 print(su) su = su +su print(su) try: su += 3 return'Not OK' # pragma: no cover except Exception as e: pass return 'OK'
if __name__ == '__main__': CheckIntegrity() # pragma: no cover