# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""Base reader object from which all the readers of BasicTools inherit
"""
import os
import sys
import struct
import locale
import numpy as np
from BasicTools.Helpers.BaseOutputObject import BaseOutputObject
[docs]class ReaderBase(BaseOutputObject):
""" ReaderBase class"""
def __init__(self,fileName = None) :
super(ReaderBase,self).__init__()
self.fileName = None
self.readFormat = 'r'
self.nodalFields = {}
self.elementsFields = {}
self.binary = False
self.string = None
self.commentChar = None
self.filePointer = None
self.lineCounter = 0
self.pipe = False
self.encoding = locale.getpreferredencoding(False)
self.canHandleTemporal = False
self.extraOutput = None
self.SetFileName(fileName)
[docs] def SetBinary(self, binary = True):
"""Sets the binary status of the file to read
Parameters
----------
binary : bool, optional
if True, sets the file to read as binary, by default True
"""
self.binary = binary
if binary:
if self.readFormat.find("b") >= 0:
return
self.readFormat += "b"
else:
if self.readFormat.find("b") >= 0:
self.readFormat = self.readFormat.replace("b","")
[docs] def StartReading(self):
if not(self.fileName is None):
if self.readFormat.find('b') > -1 :
self.filePointer = open(self.fileName, self.readFormat)
self.text_stream = self.filePointer
else:
#I have some problems reading with numpy fromfile if the file is
#open with the codecs.open
#import sys
self.filePointer = open(self.fileName, self.readFormat,encoding=self.encoding)
#import codecs
#if sys.version_info[0] == 2:
# self.filePointer = codecs.open(self.fileName, self.readFormat, 'utf-8')
elif not(self.string is None):
if self.readFormat.find('b') > -1 :
from io import BytesIO
if type(self.string) is str:
self.filePointer = BytesIO(bytearray(self.string,self.encoding))
else:
self.filePointer = BytesIO(self.string)
self.text_stream = self.filePointer
else:
import io # Python3
self.filePointer = io.StringIO(self.string)
self.text_stream = self.filePointer
elif self.pipe:
import os
r, w = os.pipe()
if self.readFormat.find('b') > -1 :
self.filePointer = sys.stdin.buffer
#os.fdopen(r, self.readFormat)
self.text_stream = self.filePointer
else:
#I have some problems reading with numpy fromfile if the file is
#open with the codecs.open
#import sys
self.filePointer = sys.stdin
#os.fdopen(r, self.readFormat)
else:
raise Exception('Need a file or a string to read')
self.lineCounter = 0
[docs] def GetFilePointer(self):
return self.filePointer
[docs] def EndReading(self):
self.filePointer.close()
[docs] def SetFileName(self, fileName):
"""Sets the name of file to read
Parameters
----------
fileName : str
file name to set
"""
if not(fileName is None) and len(fileName) >= 4 and fileName[0:4] == "PIPE" :
self.SetReadFromPipe()
else:
self.fileName = fileName;
if fileName is None :
self.__path = None;
self.string = None;
else:
self.filePath = os.path.abspath(os.path.dirname(fileName))+os.sep;
self.string = None
self.pipe = False
[docs] def SetStringToRead(self,string):
"""Sets data to be read as a string instead of a file
Parameters
----------
string : str
data to be read
"""
self.string = string
if string is not None:
self.fileName = None
self.pipe = False
[docs] def SetReadFromPipe(self):
self.SetFileName(None)
self.SetStringToRead(None)
self.pipe = True
[docs] def PeekLine(self,withError=False):
"""Read a line without advancing"""
pos = self.filePointer.tell()
line = self.filePointer.readline()
self.filePointer.seek(pos)
return line
[docs] def Peek(self,length=1):
"""Read a length number of chars without advancing the file """
pos = self.filePointer.tell()
data = self.filePointer.read(length) # Might try/except this line, and finally: f.seek(pos)
self.filePointer.seek(pos)
return data
[docs] def ReadCleanLine(self,withError=False):
while(True):
string = self.filePointer.readline()
self.lineCounter +=1
#end of file
if string == "" :
if withError :
if self.fileName is None:
raise("Problem reading string : at line " +str(self.lineCounter))
else:
raise(Exception("Problem reading file :" +str(self.fileName) + " at line" +str(self.lineCounter) ))
return None
string = string.replace(u'\ufeff', '').strip(u' \r\n')
#empty line
if len(string) == 0 :
continue
if self.commentChar is None:
break# pragma: no cover
else :
inbreak = False
for i,j in zip(string,self.commentChar):
if i != j :
inbreak = True
break
if inbreak:
break
#if string[0] != self.commentChar:
# break
return string
##binary interface
[docs] def rawread(self,cpt,withError=False):
res = self.filePointer.read(cpt)
if withError and len(res) == 0:
raise EOFError("Problem reading file :" +str(self.fileName) + " EOF")
else:
return res
[docs] def readInt32(self):
rawdata = self.rawread(4,withError=True)
data = struct.unpack("i", rawdata)[0]
return data
[docs] def readInt64(self):
rawdata = self.rawread(8,withError=True)
data = struct.unpack("q", rawdata)[0]
return data
[docs] def readData(self,cpt,datatype):
try:
return np.fromfile(self.filePointer,dtype=datatype,count=cpt,sep="")
except:
s = np.dtype(datatype).itemsize*cpt
data = self.filePointer.read(s)
return np.frombuffer(data,dtype=datatype)
[docs] def reshapeData(self,data,finalShape=None):
if finalShape is None:
return data
else:
data.shape = finalShape
return data
[docs] def readFloats32(self,cpt,finalShape=None):
return self.reshapeData(self.readData(cpt,np.float32), finalShape)
[docs] def readFloats64(self,cpt,finalShape=None):
return self.reshapeData(self.readData(cpt,np.float64), finalShape)
[docs] def seek(self,cpt):
self.filePointer.seek(cpt)
[docs]def CheckIntegrity():
obj = ReaderBase()
obj.commentChar = "#"
obj.SetBinary(False)
try:
obj.StartReading()
raise # pragma: no cover
except :
pass
testString = """0
1
2
3
#this is a comment
4"""
obj.SetStringToRead(testString)
def checkBaseReaderAscii(obj):
obj.StartReading()
print("file Pointer: ", str(obj.GetFilePointer() ) )
if obj.PeekLine() != "0\n":
raise
if obj.Peek() != "0":
raise
for i in range(5):
if i != int(obj.ReadCleanLine()):
raise
#before last
if obj.ReadCleanLine() != None:
raise
error = False
try:
obj.ReadCleanLine(True) # this line must fail
error = True
except:
pass
if error:
raise(Exception("Must fail ") )
obj.EndReading()
checkBaseReaderAscii(obj)
from BasicTools.Helpers.Tests import WriteTempFile
fn = WriteTempFile('ReaderBaseTestString',testString)
obj.SetFileName(fn)
checkBaseReaderAscii(obj)
binarydata = np.array([0], dtype=np.int32).tobytes()
binarydata += np.array([1], dtype=np.int64).tobytes()
binarydata += np.array([2], dtype=np.float32).tobytes()
binarydata += np.array([3], dtype=np.float64).tobytes()
fn = WriteTempFile('ReaderBaseTestbinary',binarydata,mode='wb')
obj.SetBinary(False)
obj.SetBinary(True)
obj.SetBinary(False)
obj.SetBinary(False)
obj.SetBinary(True)
obj.SetBinary(True)
obj.SetFileName(fn)
print(obj.readFormat)
def checkBaseReaderBinary(obj):
obj.StartReading()
if obj.readInt32() != 0: raise
if obj.readInt64() != 1: raise
if obj.readFloats32(1) != 2.: raise
if obj.readFloats64(1) != 3.: raise
obj.EndReading()
checkBaseReaderBinary(obj)
obj.SetStringToRead(binarydata)
checkBaseReaderBinary(obj)
return "ok"
if __name__ == '__main__':
print(CheckIntegrity())# pragma: no cover