PDS Note: ============================================================================== The Python code Remove_pt2Hz_step.py was used by the partially processed to calibrated data processing pipeline to remove some artifacts from the InSight IFG magnetometer 0.2 Hz data. It is an older version of the code that eventually became Remove_2Hz_step.py and relies on some older library functions that are inlcuded at the end of this file. This code is designed to read and write data in special self-documented binary structure called IGPP flatfiles (FF). A FF consists of a flat binary data table with time stored as a double precision count of seconds since Jan 1, 1966 and the remainder of the columns are 4-byte floats. The expected input data file contains an entire InSight release. The output file has some spikes and steps removed and a data quality flag column added to indicate where data were modified. This file is provided with the PDS archive as documentation, not as working code. It will not work with the ASCII files in the PDS archive but it could be modified by as user to do so. It is provided here for SMD-41a and FAIR compliance, and to document the algorithms used to identify and remove artifacts from the IFG calibrated data. Pipeline usage used the default settings - python3 Remove_2Hz_step.py inputFF outputFF The code requires: python >=3.9 numpy >=1.16 scipy >=1.3 and the fflib files that are included in at the end of this document. ================================================================================ Begin Readme file Copyright (c) 2023 Regents of the University of California All Rights Reserved ================================================================================ Remove_pt2Hz_step.py - The main script that makes the corrections in the data. ffPy - A folder that contains the python flat file loading/writing library. ffCreator.py - A module I use to generate flat files more easily. Environment: python >=3.6 numpy >=1.16 scipy >=1.3 Note that the version of shiftCorr_pt2Hz.py in the zip file has a bug that was corrected later. If you accidentally unzip the file, I've saved a copy of the corrected code in the save_py folder. Running the script: To run the script with the default parameters, simply run: python3 Remove_pt2Hz_step.py inputFF outputFF where inputFF is the name of the flat file to be corrected and outputFF is the name to give to new corrected flat file. The following optional command-line arguments may also be passed to override the defaults: --winLen - Number of points to look at beyond a potential onset for potential offsets --refRow - Row used to identify slope changes (Bx, By, Bz = 0, 1, 2) --t0 - The first (larger) threshold used to check for potential onsets --t1 - The smaller threshold used to check for potential onsets --avgWin - Number of points to use when calculating the average of a field near a given index and the number of points to smooth on each edge of a correction --compPerc - Percentage used when comparing how close amplitudes are to each other --compoundFrac - Fraction used to set the upper bound for the sum of a set of amplitudes in order for it to count as a compound shift For example, to run the program with a window length of 1600 and with the reference row set to 2, one should run: python3 Remove_pt2Hz_step.py --winLen=1600 --refRow=2 inputFF outputFF Other parameters that are less likely to be changed (as well as those listed above) can be found in the initialization function of the ShiftCorrector class (line 531), with comments explaining what they are used for. The command-line arguments can also be quickly reviewed by running: python3 Remove_pt2Hz_step.py --help ============================================================================== Begin Python Code ============================================================================== # Insight correction import argparse import numpy as np import scipy from scipy import signal import bisect import sys sys.path.insert(0, 'ffPy') import FF_File from ffCreator import createFF # Note: Ploting/debugging tools left in as comments # import matplotlib.pyplot as plt # *Flags* # pcFlag = False # Plot each partial correct step # itFlag = False # After each correct # cpFlag = False # Plot each compound step # ssFlag = False # Plot simple shift correction steps # ppFlag = False # Plot points # cmpPtsFlag = ppFlag and False # Plot compound points # allPtsFlag = ppFlag and False # Plot all points # linAmpFlag = False # Plot base slope # ringPtsFlag = ppFlag and False # Plot points identified as ringing # natPtsFlag = ppFlag and False # mlstPtsFlag = ppFlag and False class InsightDataCorrector(): def __init__(self, times, dta, mlst=None, res=None): self.times = times self.data = dta self.dtaLen = len(dta[0]) self.prevData = np.array(dta) self.MLST = mlst # self.fig, self.axes = plt.subplots(3, 1, sharex=True) # self.fig.subplots_adjust(hspace=0,right=0.95, top=0.95) # Set up data quality array self.dqf = np.zeros((self.dtaLen, 15), dtype=np.int8) self.issueLocs = {'Peak':8, 'Simple Shift':9, 'Irregular Shift':10, 'Irregular steps/ramps':11} self.qualityVals = {'No Issue': 0, 'Found+Corrected': 1, 'Found+Full_Partial':2, 'Found+Partial':3, 'Found':4, 'Not Evaluated':5 } # # Some useful plotting functions for examining modified data # def plotPoints(self, pts, dataRow, label=None, ptLabels=None): # # Plot specified points (list of indices) from original data # if label is None: # label = '' # ax = self.axes[dataRow] # dtaSubset = self.prevData[dataRow][pts] # timeSubset = self.times[pts] # ax.plot(timeSubset, dtaSubset, 'o', label=label) # if ptLabels is not None: # prevLst = set() # for i in range(0, len(pts)): # if ptLabels[i] not in prevLst: # ax.annotate(ptLabels[i], (timeSubset[i], dtaSubset[i])) # prevLst.add(ptLabels[i]) # ax.legend() # def plotOriginalData(self): # for row, kw, ax in zip([0,1,2], ['Bx_IFG','By_IFG','Bz_IFG'], self.axes): # ax.plot(self.times, self.data[row], label=kw+' - Original') # def plotProcessedDta(self): # for row, ax in zip([0,1,2], self.axes): # ax.plot(self.times, self.data[row], label='Processed') # ax.legend() def setQualityFlag(self, indexRange, errType, flagValue): val = self.qualityVals[flagValue] iO, iE = indexRange numVals = iE-iO if errType == 'Shift': loc = self.issueLocs['Simple Shift'] prevFlags = self.dqf[iO:iE+1][:,loc] mask1 = (prevFlags != self.qualityVals['No Issue']) mask2 = (prevFlags != self.qualityVals['Not Evaluated']) mask = mask1 & mask2 if True in mask: irrShiftLoc = self.issueLocs['Irregular Shift'] self.dqf[iO:iE+1][:,irrShiftLoc] = np.tile([val], numVals+1) self.dqf[iO:iE+1][:,loc] = np.tile([val], numVals+1) elif errType == 'Compound': loc = self.issueLocs['Irregular Shift'] self.dqf[iO:iE+1][:,loc] = np.tile([val], numVals+1) elif errType == 'Peak': loc = self.issueLocs['Peak'] self.dqf[iO][loc] = val elif errType == 'Sawtooth': loc = self.issueLocs['Irregular steps/ramps'] self.dqf[iO:iE+1][:,loc] = val else: loc = 0 self.dqf[iO][loc] = val def arrayToVal(self, x): return sum([x[i]*(10**i) for i in range(0, len(x))]) def valToArray(self, x): # Converts the data quality flag value into individual flag values x = int(x) res = [int(str(x)[i]) for i in range(0, len(str(x)))] res = res[::-1] res = np.concatenate([res, np.zeros(7-len(str(x)))]) return res def getQualityFlagCol(self): for col in range(8, 12): mask = self.dqf[:,col] == self.qualityVals['Not Evaluated'] self.dqf[:,col][mask] = self.qualityVals['No Issue'] dqfCol = self.dqf[:,8:15] col = [] for row in range(0, len(dqfCol)): sumVal = 0 for i in range(0, 15-8): sumVal += dqfCol[row][i] * (10 ** i) col.append(sumVal) return col def loadQualityFlagCol(self, dqf2): for row in range(0, len(dqf2)): self.dqf[row][8:15] = self.valToArray(dqf2[row]) def processData(self, args): # Correct peaks in each field direction separately print ('Removing single-point spikes...') for row in [0, 1, 2]: dta = self.data[row] peakCorr = PeakCorrector(self.times, dta, self) print ('Identifying sawtooth glitches...') sawtoothIdent = SawtoothIdentifier(self.times, self.data, self.MLST, self) sawtoothIdent.findSawtoothGlitches(10.5, 16.75) print ('Correcting shifts...') # Use default or user-chosen parameters if defined shiftCorr = ShiftCorrector(self.times, self.data, parent=self, mlst=self.MLST) winLen = 1440 if args.winLen is None else args.winLen t0 = shiftCorr.t0 if args.t0 is None else args.t0 t1 = shiftCorr.t1 if args.t1 is None else args.t1 avgWin = shiftCorr.avgWin if args.avgWin is None else args.avgWin refRow = shiftCorr.refRow if args.refRow is None else args.refRow compPerc = shiftCorr.compPerc if args.compPerc is None else args.compPerc compFrac = shiftCorr.compoundPerc if args.compoundFrac is None else args.compoundFrac shiftCorr.correctShifts(winLen=winLen, t0=t0, t1=t1, avgWin=avgWin, perc=compPerc, refRow=refRow, compFrac=compFrac) class PeakCorrector(): # Operates on single row of data (1 x N) def __init__(self, times, dta, parent): self.times = times self.data = dta self.dtaLen = len(dta) self.parent = parent self.res = self.times[1] - self.times[0] self.skipFlags = np.zeros(self.dtaLen, dtype=np.int8) def correctPeaks(self, spikeThresh, edgeThresh): index = 1 while index < (self.dtaLen - 1): # Skip if data gap detected if (self.times[index+1] - self.times[index]) > (2 * self.res): index += 1 continue left = self.data[index-1] center = self.data[index] right = self.data[index+1] edgeDiff = abs(right-left) leftDiff = abs(center-left) rightDiff = abs(center-right) # If center val is signif. greater than values its left/right # values and those same left/right values are not signif. different sigDiff = leftDiff > spikeThresh and rightDiff > spikeThresh if edgeDiff < edgeThresh and sigDiff: # Set this point to the average and update the quality flag avg = (left+right)/2 self.data[index] = avg self.parent.setQualityFlag((index, index), 'Peak', 'Found+Corrected') index += 1 class SawtoothIdentifier(): def __init__(self, times, dta, mlst, parent): self.times = times self.data = dta self.mlst = mlst self.res = times[1] - times[0] self.parent = parent self.compRow = 1 # Row to check for little to no glitchs in data self.sigRow = 2 # Row with sawtooth-like glitches self.filterWin_1 = 7 # Savgol filter parameters (first + second runs) self.filterPoly_1 = 2 self.filterWin_2 = 3 self.filterPoly_2 = 1 self.lwrSlopeBnd = 0.0725 / self.res # Slopes should be steeper than this self.upprSlopeBnd = 0.5 / self.res # Slopes should not be more steep than this self.minAmp = 1 # Minimum abs amplitude of onset self.maxAmp = 5.5 # Maximum abs amplitude that an onset can have self.edgeSlopeThresh = 0.1 / self.res # At least one edge should have slopes > this self.maxDist = 150 # Max number of points to-be-merged grps can be apart by self.minWin = 250 # Minimum number of points that should make up a grp def getSlope(self, index, dta): # Returns slope relative to index and index - 1 v1 = dta[index] v2 = dta[index-1] t1 = self.times[index] t2 = self.times[index-1] slope = (v1-v2) / (t1-t2) return slope def findAllRangesToCheck(self, t0, t1): # t0 = MLST hour to start looking for glitches # t1 = MLST hour to stop looking for glitches # Find all the indices that fall within MLST > t0 and MLST < t1 mask = (self.mlst > t0) & (self.mlst < t1) indices = np.arange(0, len(self.mlst)) indices = indices[mask] # Reduce the set of indices to a list of tuples corresponding to the # start/end indices for each range of consecutive indices indexPairs = [] i = 0 while i < len(indices): a = indices[i] j = i + 1 while j < len(indices): if indices[j] - indices[j-1] > 1: # Indices no longer consecutive break j += 1 b = indices[j-1] indexPairs.append((a, b)) i = max(i+1, j) return indexPairs def findAmpsAndSlopes(self, rowDta, times): # Gets the amplitudes and overall slope corresponding to indices # in filtered data where row changes sign indices = [] amps = [] slopes = [] index = 1 while index < len(rowDta): currSlope = self.getSlope(index, rowDta) subIndex = index + 1 while subIndex < len(rowDta) and np.sign(currSlope) == np.sign(self.getSlope(subIndex, rowDta)): subIndex += 1 if subIndex == len(rowDta): subIndex = subIndex - 1 if index == subIndex: index = index + 1 continue indices.append((index, subIndex)) amps.append(rowDta[subIndex] - rowDta[index]) slope = (rowDta[subIndex]-rowDta[index])/(times[subIndex]-times[index]) slopes.append(slope) index = max(index+1, subIndex) return indices, amps, slopes def checkNearbySlopes(self, lft, rght, dta, ofst): # Checks if slopes to left/right of onset are practically flat to # avoid misidentifying square-step-like glitches as sawtooth-like glitches win = 5 rghtStart, rghtEnd = min(rght+1, rght), min(rght+win, len(dta)) lftStart, lftEnd = max(0, lft-win), max(0, lft-1) rightSlopes = [self.getSlope(i, dta) for i in range(rghtStart, rghtEnd)] leftSlopes = [self.getSlope(i, dta) for i in range(lftStart, lftEnd)] rightSlopeAvg = np.mean(np.abs(rightSlopes)) leftSlopeAvg = np.mean(np.abs(leftSlopes)) if rightSlopeAvg < self.edgeSlopeThresh and leftSlopeAvg < self.edgeSlopeThresh: return False return True def findGlitchesInRange(self, a, b): # Get sig row's data and comp row's data and filter accordingly rowDta = self.data[self.sigRow][a:b] sigRowDta = signal.savgol_filter(rowDta, self.filterWin_1, self.filterPoly_1) sigRowDta = signal.savgol_filter(sigRowDta, self.filterWin_2, self.filterPoly_2) compRowDta = self.data[self.compRow][a:b] compRowDta = signal.savgol_filter(compRowDta, self.filterWin_1, self.filterPoly_1) # Find grps of indices w/ amplitudes and slopes that are within range # of the amps/slopes of sawtooth-like glitches sawtoothIndices = [[]] grpIndex = 0 indices, amps, slopes = self.findAmpsAndSlopes(sigRowDta, self.times[a:b]) largeJumps = [] for (x, y), amp, slope in zip(indices, amps, slopes): # Skip very small jumps and square-wave-type onsets if abs(slope) < self.lwrSlopeBnd: continue if not self.checkNearbySlopes(x, y, rowDta, a): continue compAmp = compRowDta[y] - compRowDta[x] compSlope = compAmp / (self.times[y] - self.times[x]) # Skip and break apart groups between large time gaps or steep # non-sawtooth-like jumps, and keep a list of jumps largeAmp = (abs(amp) > self.maxAmp and abs(slope) > self.upprSlopeBnd) largeGap = (len(sawtoothIndices[grpIndex]) >= 1 and x - sawtoothIndices[grpIndex][-1][1] > 100) if largeAmp or largeGap: sawtoothIndices.append([]) grpIndex += 1 if abs(amp) > self.maxAmp: largeJumps.append(x) continue # Amp threshold reached and data in other row is not particularly glitchy if abs(amp) > self.minAmp and abs(compSlope) < abs(slope)/2: sawtoothIndices[grpIndex].append((x, y)) # For each grp found, merge groups and make sure at least 2 indices # have an amplitude > 2.5 to make sure these are sawtooth-like glitches # and not just smaller variations grps = [] for grp in sawtoothIndices: # Check if small group can be merged w/ a previously added group if len(grp) <= 2: if len(grp) > 0 and len(grps) != 0: lastStop = grps[-1][1] grpEndStop = grp[0][0] largeJumpFound = False for i in range(lastStop, grpEndStop): if i in largeJumps: largeJumpFound = True break if grpEndStop - lastStop < self.maxDist and not largeJumpFound: lastStart, otherStop = grps.pop() grps.append((lastStart, lastStop)) continue # Check that the current group has at least two onsets w/ # an amp > 2.5 start, stop = grp[0][0], grp[-1][1] count = 0 for x, y in grp: amp = sigRowDta[y] - sigRowDta[x] if abs(amp) >= 2.5: count += 1 if count >= 2: break if count >= 1 and len(grps) != 0: lastStop = grps[-1][1] largeJumpFound = False for i in range(lastStop, start): if i in largeJumps: largeJumpFound = True break if start - lastStop < self.maxDist and not largeJumpFound: a, b = grps.pop() grps.append((a, stop)) if count < 2: continue # Adjust end point to index where data becomes less noisy and add to list stop = self.adjustEndPoint(stop, rowDta, largeJumps) grps.append((start, stop)) if len(grps) > 1: # Merge any grps that are very close together mergedGrps = [] a, b = grps[0] z = 1 while z < len(grps): c, d = grps[z] if c - b < self.maxDist: if len(mergedGrps) > 0: mergedGrps.pop() mergedGrps.append((a, d)) z += 1 a, b = a, d else: mergedGrps.append((a,b)) mergedGrps.append((c,d)) z += 1 a, b = c, d # Skip any very short groups grps = [] for grp in mergedGrps: a, b = grp if b - a >= self.minWin: grps.append((a, b)) return grps def adjustEndPoint(self, stopIndex, sigRowDta, jumpLst): # Looks for the point past the end of the grp where the data # starts to be less noisy, marking the end of the sawtooth glitches index = stopIndex slopeThresh = 0.05 / self.res win = 15 while index - win < len(sigRowDta): if index in jumpLst: break strt, stp = index, min(len(sigRowDta), index+1) nextSlopes = [self.getSlope(i, sigRowDta) for i in range(strt, stp)] avgSlopes = np.mean(np.abs(nextSlopes)) if avgSlopes < slopeThresh: break index += 1 if index > stopIndex + 300 or index-win >= len(sigRowDta): index = stopIndex return index + int(win/2) def findSawtoothGlitches(self, t0, t1): indexRanges = self.findAllRangesToCheck(t0, t1) for a, b in indexRanges: glitchRanges = self.findGlitchesInRange(a, b) for x, y in glitchRanges: # Adjust indices for offset x = a + x y = a + y # Set quality flag for range self.parent.setQualityFlag((x, y), 'Sawtooth', 'Found') class ShiftHelperFunctions(): def __init__(self, times, dta, res, tol): self.times = times self.data = dta self.res = res self.tol = tol def withinPerc(self, val1, val2, perc): # Tests if val1 is within given percent of val2 lowerVal = val2 * (1-perc) upperVal = val2 * (1+perc) lowerBnd = min(lowerVal, upperVal) upperBnd = max(lowerVal, upperVal) if (lowerBnd - self.tol) <= val1 and val1 <= (upperBnd + self.tol): return True return False def withinEpsilon(self, val1, val2, epsilon): lowerVal = val2-epsilon upperVal = val2+epsilon lowerBound = min(lowerVal, upperVal) upperBound = max(lowerVal, upperVal) if (lowerBound - self.tol) < val1 and val1 < (upperBound + self.tol): return True return False def vecWithinPerc(self, vec1, vec2, perc, appCount=3): # Checks if vec1's values are within perc of vec2's values # appCount = # of rows that need to satisfy this property to return True count = 0 for row in range(0, 3): if self.withinPerc(vec1[row], vec2[row], perc): count += 1 if count >= appCount: return True return False def vecWithinEpsilon(self, vec1, vec2, epsilon, appCount=3): # Checks if vec1's values are epsilon perc of vec2's values # appCount = # of rows that need to satisfy this property to return True count = 0 for row in range(0, 3): if self.withinEpsilon(vec1[row], vec2[row], epsilon): count += 1 if count >= appCount: return True return False def vecNorm(self, vec): return np.sqrt(np.dot(vec, vec)) def getDiffVec(self, i): return self.data[:,i+1] - self.data[:,i-1] def getSlope(self, index, dta): # Returns slope relative to index and index - 1 v1 = dta[index] v2 = dta[index-1] t1 = self.times[index] t2 = self.times[index-1] slope = (v1-v2) / (t1-t2) return slope class ShiftCorrector(ShiftHelperFunctions): # Operates on full set of data (3 x N) def __init__(self, times, dta, res=None, parent=None, mlst=None): self.times = times self.data = dta self.dtaLen = len(dta[0]) self.parent = parent self.res = times[1] - times[0] self.MLST = mlst self.skipSet = [] self.natSet = [] self.filterKeyMap = {} # User-set and default parameters self.t0 = 0.5 # First threshold self.t1 = 0.3 # Second threshold self.avgWin = 5 # Default # of values to avg to left/right of index self.compPerc = 0.2 # Frac used when comparing amps self.ringCompPerc = 0.35 # Frac used when comparing potential ringing onsets self.ringAmpCompPerc = 0.25 # Frac used in place of compPerc for corrections w/ ringing self.validityPerc = 0.3 # Frac used when checking potential correction's validity self.refRow = 1 # Vector row used as reference for slope calculations self.compoundPerc = 0.1 # Used in checks for compound shifts, see checkForCompound self.slopeThresh = 0.05 / self.res # Minimum slope required for slope edge checking self.ringSlopeThresh = 0.125 / self.res # Minimum slope when looking onset edges in filtered data self.ringEdgeWin = 275 # Number of points to include left/right of ring grps # when using 'smoothed' amp calculations self.natSlopeThresh = 0.5 / self.res # How steep slopes must be for onsets self.minAmp = 0.3 # Minimum amplitude; Below this amount, amplitudes are # treated as being 'similar' for simple shift checks self.tol = 0.0555 # Leeway amount used in value comparisons (withinPerc, withinEpsilon) ShiftHelperFunctions.__init__(self, times, dta, self.res, self.tol) # Skip range helper functions def timeGapInRange(self, i, j): numPts = j - i if abs(self.times[j] - self.times[i]) > ((numPts+1) * self.res): return True return False def ringingInRange(self, i, j): # Checks if there are any points w/ signif. ringing in the given range if len((set(np.arange(i, j+1)) & set(self.ringSet))) > 0: return True return False def inMLSTSkipRange(self, index): # Checks if index is in the range of data that should be skipped if self.MLST is not None: if self.MLST[index] > 10.5 and self.MLST[index] < 16.8: return True return False else: return False def checkIfInSkipSet(self, index): loc = self.parent.issueLocs['Irregular Shift'] flagVal = self.parent.qualityVals['Found'] if self.parent.dqf[index][loc] == flagVal or self.parent.dqf[index][0] == flagVal: return True return False def arangeSkipSet(self, skipSet): if len(skipSet) <= 1: return startIndex = skipSet[0] lastIndex = startIndex for index in skipSet: if index - lastIndex > 1: self.pairs.append((startIndex,lastIndex)) startIndex = index lastIndex = index self.startIndices = [i0 for (i0, i1) in self.pairs] def getDiffBetweenIndices(self, i, j): return self.data[:,j] - self.data[:,i] def findSpikeInRange(self, i, j, rowLst=None): if rowLst is None: rowLst = [0,1,2] # Looks for a spike-like artifact at each index, for each component, # and if there is only one significant spike, that has a sufficiently # large amplitude, then return its row and the indices that mark its # left edge and right edge compPerc = 0.35 index = i for k in range(i+1, j): count = 0 refRow = 0 for row in rowLst: lft, rght = self.findSlopeChanges(index, self.data[row], 0) diff1 = self.data[row][rght] - self.data[row][lft] if abs(diff1) < 0.3: continue nextLft, nextRght = self.findSlopeChanges(k, self.data[row], 0) diff2 = self.data[row][nextRght] - self.data[row][nextLft] sameSign = np.sign(diff1) == np.sign(diff2) similarVal = self.withinPerc(diff1, diff2*(-1), compPerc) if similarVal and not sameSign: count += 1 refRow = row if count == 1: lft, rght = self.findSlopeChanges(index, self.data[refRow], 0) diff1 = self.data[row][rght] - self.data[row][lft] if abs(diff1) < 0.7: continue nextLft, nextRght = self.findSlopeChanges(k, self.data[refRow], 0) return refRow, lft, rght, nextLft, nextRght return None def findUnusualOnsets(self, onlst): oddLst = [] uniqLst, ampLst = self.getUniqLst(onlst) # Look for onsets that have a significant spike in only one component # and with small nearby slopes (to make sure it's not part of ringing) for z in range(0, len(uniqLst)): index = uniqLst[z] spikeRes = self.findSpikeInRange(index, index+10) if spikeRes is None: continue refRow, lft, rght, nextLft, nextRght = spikeRes diff1 = self.getDiffBetweenIndices(lft, rght)[refRow] diff2 = self.getDiffBetweenIndices(nextLft, nextRght)[refRow] leftSlopes = np.abs([self.getSlope(i, self.data[refRow]) for i in range(lft-5, lft)]) rightSlopes = np.abs([self.getSlope(i, self.data[refRow]) for i in range(nextRght+1, nextRght+6)]) slopeBound = 0.125 / self.res if np.mean(leftSlopes) > slopeBound: continue # Check if nearby left and right averages are close to each other # (relative to the difference between the spike's left/right amps) # and correct the issue in this component diff = diff1 + diff2 leftAvg = self.getNearbyAvg(lft, self.avgWin, 'left', ofst=2) rightAvg = self.getNearbyAvg(nextRght, self.avgWin, 'right', ofst=2) if self.withinEpsilon(leftAvg[refRow], rightAvg[refRow], 0.5+abs(diff)) and np.mean(rightSlopes) <= slopeBound: self.data[refRow][lft:nextRght+1] = np.tile((leftAvg+rightAvg)[refRow]/2, (nextRght-lft)+1) self.parent.setQualityFlag((lft, nextRght), 'Shift', 'Found+Partial') oddLst.extend([i for i in range(lft, nextRght+1)]) continue # Otherwise, check for another nearby spike that might make this onset # form a pair of close-together spikes in a single component that # form a single small shift in the other components prevDiff = diff for nearIndex in range(nextRght, nextRght+10): spikeRes = self.findSpikeInRange(nearIndex, nearIndex+10, [refRow]) if spikeRes is None: continue refRow, lft_near, rght_near, nextLft_near, nextRght_near = spikeRes # Add the difference between this spike's left and right edge amps # to the overall diff that's used to check if the left and right # averages relative to the artifacts are reasonably close to each other diff1 = self.data[refRow][rght_near] - self.data[refRow][lft_near] diff2 = self.data[refRow][nextRght_near] - self.data[refRow][nextLft_near] diff = prevDiff + diff1 + diff2 rghtAvg = self.getNearbyAvg(nextRght_near, self.avgWin, 'right', ofst=2) if self.withinEpsilon(leftAvg[refRow], rightAvg[refRow], 0.5 + abs(diff)): rightSlopes = np.abs([self.getSlope(i, self.data[refRow]) for i in range(nextRght_near+1, nextRght_near+6)]) if np.mean(rightSlopes) > slopeBound: continue self.data[refRow][lft:nextRght_near+1] = np.tile((leftAvg+rightAvg)[refRow]/2, (nextRght_near-lft)+1) self.parent.setQualityFlag((lft, nextRght_near), 'Shift', 'Found+Partial') oddLst.extend([i for i in range(lft, nextRght_near+1)]) return oddLst def initNaturalSlopeSet(self, t0, t1, s0, numThresh=5): # Tries to identify onsets that may be actual variations in data # by checking the slopes and the number of points that make up the onset # s0 = maximum slope a suspected 'natural onset' can take # numThresh = number of points that must make up the slope/onset onlst = list(self.findOnsetsInWindow(0, self.dtaLen-1, t0-0.1, t1-0.1, False)) susPts = [] susDict = {} fullPts = [] for index in onlst: for currRow in [1, 2]: lft, rght = self.findSlopeChanges(index, self.data[currRow], self.slopeThresh) if lft == rght: slope = self.getSlope(index+1, self.data[currRow]) lft = index rght = index + 1 else: slope = (self.data[currRow][rght] - self.data[currRow][lft]) / (self.times[rght] - self.times[lft]) # First check general slope numPts = rght - lft if (abs(slope) > s0 and numPts < 8) or numPts < 3: continue # Check edge slopes to make sure they're also steep if numPts > 5: edgeSlopes = [abs(self.getSlope(i, self.data[currRow])) for i in [lft-1, lft, rght-1, rght]] centerIndex = int((lft+rght)/2) centerSlope = abs(self.getSlope(centerIndex, self.data[currRow])) centerSlopes = np.array([self.getSlope(i, self.data[currRow]) for i in range(centerIndex-1, centerIndex+1+1)]) # Check if edge slopes are very small or all center slopes are < s0 centerMask = centerSlopes < s0 mask = edgeSlopes < (centerSlope / 3) if True in mask or (False not in centerMask): lft, rght = self.findSlopeChanges(index, self.data[currRow], 0) fullPts.extend([i for i in range(lft, rght)]) continue else: # Similar to above but for onsets made up of 3-5 points lftSlope = abs(self.getSlope(lft+1, self.data[currRow])) rghtSlope = abs(self.getSlope(rght, self.data[currRow])) innerSlope = abs(self.getSlope(int((lft+rght)/2), self.data[currRow])) fracInnerSlope = innerSlope / 3 if lftSlope < fracInnerSlope and rghtSlope < fracInnerSlope and innerSlope < s0: lft, rght = self.findSlopeChanges(index, self.data[currRow], 0) fullPts.extend([i for i in range(lft, rght)]) continue fullPts = list(set(fullPts)) fullPts.sort() return fullPts def markSkipRange(self, indexRange): a, b = indexRange self.parent.setQualityFlag((a, b), 'Other', 'Found') def checkIfTypicalRingSet(self, lft, rght, lftInner, rghtInner): innerSimilar = False innerLftAvg = self.getNearbyAvg(lftInner, self.avgWin, 'right') innerRghtAvg = self.getNearbyAvg(rghtInner, self.avgWin, 'left') if not self.vecWithinEpsilon(innerLftAvg, innerRghtAvg, 0.75, 2): return False leftEdge = False rightEdge = False leftAvg = self.getNearbyAvg(lft, self.avgWin, 'left') rightAvg = self.getNearbyAvg(rght, self.avgWin, 'right') leftDiff = innerLftAvg - leftAvg rightDiff = rightAvg - innerRghtAvg leftMask = (abs(leftDiff) > 2) rightMask = (abs(rightDiff) > 2) leftCount, rightCount = 0, 0 for row in range(0, 3): if leftMask[row] == False: leftCount += 1 if rightMask[row] == False: rightCount += 1 if leftCount > 1 or rightCount > 1: return False diff = rightDiff + leftDiff if self.vecWithinEpsilon(diff, np.zeros(3), 0.5): return True return False # Ringing identification / shift correction driving algorithms def initRingSet(self, t0, t1): self.skipSet = [] self.natSet = [] self.filterKeyMap = {} # Initialize ring groups, ring set, and smoothed amplitudes self.ringSet = [] self.ringGrps = [] self.avgAmps = {} self.adjustedAmps = [] self.filterDta = {} self.pairs = [] self.startIndices = [] onlst = list(self.findOnsetsInWindow(0, self.dtaLen-1, t0-0.05, t1-0.05, False)) if len(onlst) == 0: return self.ringSet, self.ringGrps = self.findRingGrps(onlst) self.oddSet = self.findUnusualOnsets(onlst) self.natSet = self.initNaturalSlopeSet(t0, t1, self.natSlopeThresh, self.avgWin) for index in self.natSet: self.parent.dqf[index][0] = self.parent.qualityVals['Found'] self.filterDtaDict = {} self.filterKeyMap = {} self.smoothRingingAmps(self.ringGrps, t0, t1, True) skipList = [] skipGrps = [] for grp in self.ringGrps: lft, rght = grp[0], grp[-1] if not self.checkIfCorrectable(lft, rght, t0, t1) or lft in self.skipSet: a, b, c, d = self.findShiftStartEnd(lft, rght, self.data[self.refRow]) self.markSkipRange((a,b)) if self.checkIfTypicalRingSet(a, b, c, d): self.parent.setQualityFlag((a, b), 'Compound', 'Found') continue for grp in self.ringGrps: for i in range(0, 3): lft, rght = grp[0], grp[-1] if (lft, rght) in skipGrps or self.checkIfInSkipSet(lft): break edgeWin = self.ringEdgeWin prevArr = np.array(self.filterDtaDict[(lft,rght)]) num = rght+edgeWin-(lft-edgeWin) # if pcFlag: # plt.clf() # plt.plot(self.times[lft-500:rght+500], self.data[1][lft-500:rght+500]) editRange = self.ringCorrect(lft, rght, num, t0, t1) # if pcFlag: # plt.plot(self.times[lft-500:rght+500], self.data[1][lft-500:rght+500]) # plt.plot(self.times[lft-self.ringEdgeWin:rght+self.ringEdgeWin], prevArr[1]) # plt.show() if editRange is not None: skipList.append(editRange) else: break for a, b in skipList: if self.checkIfRingingCorrected(a, b): for index in range(a-self.avgWin, b+self.avgWin): self.parent.dqf[index][0] = self.parent.qualityVals['Found'] def checkIfRingingCorrected(self, a, b): if b - a < 75: return False leftAvg = self.getNearbyAvg(a, self.avgWin, 'left') rightAvg = self.getNearbyAvg(b, self.avgWin, 'right') centerAvg = self.getNearbyAvg(int((a+b)/2), self.avgWin, 'right', ofst=0) leftInner = self.getNearbyAvg(a, self.avgWin, 'right') rightInner = self.getNearbyAvg(b, self.avgWin, 'left') diff = rightInner - leftInner if self.vecWithinEpsilon(leftAvg, rightAvg, 1.25, 3) and self.vecWithinEpsilon(centerAvg, leftAvg, 1.25, 3): return True if not self.vecWithinEpsilon(leftAvg, rightAvg-diff, 1.25, 3): return False if not self.vecWithinEpsilon(leftAvg, centerAvg-(diff/2), 1.25, 3): return False return True def findRingGrps(self, onsets, numThresh=3): ringGrps = [] ringingSet = [] prevSet = [] currSet = [] i = 0 onsets, amps = self.getUniqLst(onsets) while i < len(onsets): j = i + 1 amp_i = amps[i] lastOnset = onsets[i] last_amp = amp_i while j < len(onsets): index_j = onsets[j] if index_j - lastOnset > 4: break amp_j = amps[j] if not self.vecWithinPerc(np.abs(amp_j), np.abs(last_amp), self.ringCompPerc, 2): break lastOnset = index_j last_amp = amp_j j += 1 currSet = onsets[i:j] if len(currSet) < 2: if prevSet != [] and abs(currSet[0]-prevSet[-1]) > 8 and len(prevSet) > 2: ringingSet.extend(self.extendSet(prevSet)) ringGrps.append(prevSet) prevSet = currSet i = i + 1 continue if len(prevSet) > 1 and abs(currSet[0]-prevSet[-1]) < 8: tempPrev = self.extendSet(prevSet) tempNext = self.extendSet(currSet) prevAvg = np.mean(self.data[:,tempPrev[0]:tempPrev[-1]], axis=1) nextAvg = np.mean(self.data[:,tempNext[0]:tempNext[-1]], axis=1) if self.vecWithinEpsilon(nextAvg, prevAvg, 2): prevSet.extend(currSet) else: if len(prevSet) > numThresh: prevSet = self.extendSet(prevSet) ringingSet.extend(prevSet) ringGrps.append(prevSet) prevSet = currSet else: if len(prevSet) > numThresh: prevSet = self.extendSet(prevSet) a, b = prevSet[0], prevSet[-1] ringingSet.extend(prevSet) ringGrps.append(prevSet) prevSet = currSet i = max(j, i + 1) return ringingSet, ringGrps def extendSet(self, ringSet): left, right = ringSet[0], ringSet[-1] otherLft, rght = self.findSlopeChanges(right, self.data[self.refRow]) return np.arange(start=ringSet[0], stop=rght, step=1) def getPeakValues(self, i, j, direc): topPeaks = [] bottomPeaks = [] start = i count = 0 while start <= self.dtaLen and count < j: lft, rght = self.findSlopeChanges(start, self.data[self.refRow]) if lft == rght: lft = start rght = start + 1 slopeSign = np.sign(self.getSlope(lft+1, self.data[self.refRow])) if slopeSign < 0: bottomPeaks.append(self.data[:,rght]) topPeaks.append(self.data[:,lft]) # print ('BTM', rght) else: bottomPeaks.append(self.data[:,lft]) topPeaks.append(self.data[:,rght]) # print ('TOP', lft) count += 1 if direc == 'right': start = max(rght, start+1) else: start = min(start-1, lft-1) return topPeaks, bottomPeaks def getRingingAvgs(self, i, j, direc): topPeaks, bottomPeaks = self.getPeakValues(i, j, direc) numPairs = min(len(topPeaks), len(bottomPeaks)) avg = np.mean(np.stack(topPeaks[:numPairs]+bottomPeaks[:numPairs]).T, axis=1) return avg def adjustSmoothDta(self, smoothDta, indexRange, offset): a, b = indexRange # Find the ring groups contained within this region rngIndices = [] for grp in self.ringGrps: strtRing, endRing = grp[0], grp[-1] if strtRing > a and endRing < b: rngIndices.append((strtRing, endRing)) # Check each ring region to see if there is a signf. variation in values for x, y in rngIndices: x_of = x - offset y_of = y - offset # Find the relative start and end of the ringing region within # the context of the filtered data lftStrt, lftEnd = self.findSlopeChanges(x_of, smoothDta[self.refRow]) rghtStrt, rghtEnd = self.findSlopeChanges(y_of, smoothDta[self.refRow]) lft, rght = lftEnd, rghtStrt win = min(y-x, 6) lftAvg = self.getNearbyAvg(lft+offset, win, 'right', ofst=0) rghtAvg = self.getNearbyAvg(rght+offset, win, 'left', ofst=0) lftAvg = self.getRingingAvgs(lft+offset, 3, 'right') rghtAvg = self.getRingingAvgs(rght+offset, 3, 'left') # If the edges of the ringing region aren't standard # (i.e. a change in level or average amplitude at the start/end) # then skip this region leftSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(lft+offset, lft+offset+5)] rightSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(rght+offset-5, rght+offset)] if np.mean(np.abs(leftSlopes)) < 0.1 / self.res or np.mean(np.abs(rightSlopes)) < 0.1 / self.res: continue # If the regions on both sides of the ringing region are not affected # by ringing, denoting a small shift with only ringing inside it, # then skip this region leftSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(lft+offset-5, lft+offset)] rightSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(rght+offset, rght+offset+5)] if np.mean(np.abs(leftSlopes)) < 0.1 / self.res and np.mean(np.abs(rightSlopes)) < 0.1 / self.res: continue if y - x < 8: continue if lft == rght: otherLft, rght = self.findSlopeChanges(rght, smoothDta[self.refRow]) tempLft, tempRght = lft, rght lft = min(tempLft, tempRght) rght = max(tempLft, tempRght) # Get the minimum and maximum values within the filtered data for this region minVal = np.stack([min(smoothDta[row][lft:rght]) for row in range(0, 3)]) maxVal = np.stack([max(smoothDta[row][lft:rght]) for row in range(0, 3)]) # If the region is small or the variation is small, move to the next region if y - x < 12 and self.vecWithinEpsilon(minVal, maxVal, 0.75, 2): continue if self.vecWithinEpsilon(minVal, maxVal, 0.35, 2): continue # Otherwise interpolate between the leftAvg and rightAvg and # update the filtered data with this new set of values startVal = lftAvg endVal = rghtAvg for row in range(0, 3): if self.withinEpsilon(startVal[row], endVal[row], 0.25): avgVal = (lftAvg[row] + rghtAvg[row]) / 2 startVal[row] = avgVal endVal[row] = avgVal newDta = np.stack([np.interp(self.times[lft+offset:rght+offset+1], self.times[[lft+offset, rght+offset]], [startVal[row], endVal[row]]) for row in range(0, 3)]) smoothDta[:,lft:rght+1] = newDta # Final adjustment so this region is not counted as one big onset # starting from the shift up/down prior to the ringing grp newLft, newRght = self.findSlopeChanges(lft+1, smoothDta[self.refRow], 0) if newLft == newRght: continue if newLft < lft - 3: sign = np.sign(self.getSlope(lft, smoothDta[self.refRow])) diff = smoothDta[:,lft] - smoothDta[:,lft-1] if sign > 0: smoothDta[:,lft] = smoothDta[:,lft] - diff - self.tol else: smoothDta[:,lft] = smoothDta[:,lft] + diff + self.tol if newRght > rght + 3: sign = np.sign(self.getSlope(rght, smoothDta[self.refRow])) diff = smoothDta[:,rght] - smoothDta[:,rght+1] if sign > 0: smoothDta[:,rght] = smoothDta[:,rght] - diff - self.tol else: smoothDta[:,rght] = smoothDta[:,rght] + diff + self.tol return smoothDta def smoothRingingAmps(self, ringGrps, t0, t1, smoothRinging=False): edgeWin = self.ringEdgeWin # For each group in the ringGrps for grp in ringGrps: lft, rght = grp[0], grp[-1] # Apply filter to region of data starting before and ending after the ring # grp's start and end points respectively avgDta = [] for r in range(0, 3): rowDta = signal.savgol_filter(self.data[r][lft-edgeWin:rght+edgeWin], 3, 1) rowDta = signal.savgol_filter(rowDta, 3, 1) avgDta.append(rowDta) avgDta = np.array(avgDta) if smoothRinging: avgDta = self.adjustSmoothDta(avgDta, (lft-self.ringEdgeWin, rght+self.ringEdgeWin), lft-self.ringEdgeWin) # Store filter data self.filterDtaDict[(lft, rght)] = avgDta # Calculate amplitudes for indices in filtered data and store them and # relevant info in class' dictionaries for i in range(25, len(rowDta)-25): lft_i, rght_i = self.findSlopeChanges(i, avgDta[self.refRow], self.ringSlopeThresh) if lft_i <= 10 or rght_i >= edgeWin*2+(rght-lft)-10: continue lft_i = max(0, lft_i) rght_i = min(rght_i, len(rowDta)) if lft_i == rght_i or (rght_i-lft_i == 1 and abs(avgDta[:,rght_i] - avgDta[:,lft_i])[self.refRow]/self.res < self.ringSlopeThresh): self.avgAmps[lft-edgeWin+i] = np.zeros(3) else: self.avgAmps[lft-edgeWin+i] = avgDta[:,rght_i] - avgDta[:,lft_i] self.filterKeyMap[lft-edgeWin+i] = (lft, rght) def ringCorrect(self, lft, rght, num, t0, t1): lftBnd = lft - self.ringEdgeWin rghtBnd = rght + self.ringEdgeWin # Estimate the point where the ring shift has maximum value maxBand = int(self.ringEdgeWin / 4) maxIndex = lft-maxBand lastMax = rght-maxBand maxAverage = self.data[:,maxIndex] lastAvg = maxAverage win = 3 for z in range(lft-maxBand, rght+maxBand): if z > lft+win and z < rght-win: # Skip ringing range continue currAvg = np.mean(self.data[:,z-win:z+win+1], axis=1) if maxAverage[self.refRow] < currAvg[self.refRow]: lastAvg = maxAverage lastMax = maxIndex maxIndex = z maxAverage = currAvg leftAvg, rightAvg = self.getEdgeAvgs(maxIndex, self.avgWin) # Adjust max index in case outliers skew the max from the middle range if leftAvg[self.refRow] < maxAverage[self.refRow]-0.1 and self.vecWithinPerc(maxAverage, rightAvg, self.compPerc, 2): maxIndex += 2 elif rightAvg[self.refRow] < maxAverage[self.refRow]-0.1 and self.vecWithinPerc(maxAverage, leftAvg, self.compPerc, 2): maxIndex -= 2 # Get list of onsets in window onsetLst = list(self.findOnsetsInWindow(lft-self.ringEdgeWin, rght+self.ringEdgeWin, t0, t1)) if onsetLst == []: return None uniqLst, fullAmpLst = self.getUniqLst(onsetLst) # Remove any spike-like shifts from the onset list and amp list z = 1 lstLen = len(uniqLst) while z < lstLen: i, j = uniqLst[z-1], uniqLst[z] amp_i = fullAmpLst[z-1] amp_j = fullAmpLst[z] if abs(j-i) < 10 and self.compareAmps(amp_i, amp_j, self.compPerc): leftAvg = self.getNearbyAvg(i, self.avgWin, 'left') rightAvg = self.getNearbyAvg(j, self.avgWin, 'right') if self.vecWithinEpsilon(leftAvg, rightAvg, 2): for c in range(0, 2): uniqLst.pop(z-1) fullAmpLst.pop(z-1) lstLen -= 1 continue z += 1 # Break full onset list where max value is found midPoint = bisect.bisect(uniqLst, maxIndex) if midPoint == 0 or midPoint == len(uniqLst): return None # if pcFlag: # for index in uniqLst: # plt.plot(self.times[index], self.data[1][index], 'o') # plt.annotate(str(index), (self.times[index], self.data[1][index])) # Look for single shifts and compound shifts beginning from same start index win = 3 ringPerc = 0.125 ringShiftPerc = 0.225 for j in range(0, midPoint+1): currOnset = uniqLst[j] # Skip starting points that start in the middle of something else lftEdge, rghtEdge = self.findRelativeSlopeChanges(currOnset) leftAvg = self.getNearbyAvg(lftEdge, win, 'left', ofst=0) rightAvg = self.getNearbyAvg(rghtEdge, win, 'right', ofst=0) if currOnset not in self.filterKeyMap and not self.vecWithinPerc(fullAmpLst[j], rightAvg-leftAvg, self.ringAmpCompPerc, 2): continue # Look for a compound shift starting at this index simpleShift = None compRes, compLst, compSum = self.checkForCompound(uniqLst[j:], fullAmpLst[j:], False, smoothDta=self.filterDtaDict[(lft, rght)], perc=ringPerc) # Then look for potential simple shifts for k in range(midPoint+1, len(uniqLst)): sAmp = fullAmpLst[j] eAmp = fullAmpLst[k] if self.checkForSimpleShift(uniqLst[j], uniqLst[k], sAmp, eAmp, ringShiftPerc, uniqLst[j+1:k]): simpleShift = k break # If both are found, choose the one w/ smallest diff if compRes and simpleShift is not None: ampDiff = abs(sAmp-eAmp*(-1)) sumNorm = self.vecNorm(compSum/(len(compLst)-1)) ampNorm = self.vecNorm(ampDiff) if sumNorm <= ampNorm: simpleShift = None else: compRes = False # Correct and return the min/max edited values if compRes: self.correctCompound(compLst, smoothDta=self.filterDtaDict[(lft,rght)]) return compLst[0], compLst[-1] elif simpleShift is not None: self.correctSingleShift(uniqLst[j], uniqLst[simpleShift], sAmp, eAmp*(-1), smoothDta=self.filterDtaDict[(lft,rght)]) return uniqLst[j], uniqLst[simpleShift] # Uncorrectable ringing shift identifying / state updating algorithms def checkIfCorrectable(self, lft, rght, t0, t1): # Tries to identify ringing shifts with a small sudden shift in the # middle of a ringing set that result in a small level area that # appears to be similar to and level with non-ringing shifted data trueLft, trueRght = lft, rght win = 10 onsets = list(self.findOnsetsInWindow(lft-win, rght+win, t0, t1)) if len(onsets) < 2: return True onsets, ampLst = self.getUniqLst(onsets) ringAvg = np.mean(np.abs([self.getSlope(i, self.data[self.refRow]) for i in range(lft, rght)])) count = 0 thresh = ringAvg / 2 # Look for a pair of onsets that satisfy the single shift properties # (amps, signs are somewhat reasonable) and examine the slopes at the # edges and in-between to identify if it is a small shift in the ringing # that shifts up a set of data that isn't ringing for x in range(0, len(onsets)): index_a = onsets[x] amp_a = ampLst[x] sign_a = np.sign(amp_a[self.refRow]) for y in range(x+1, len(onsets)): index_b = onsets[y] amp_b = ampLst[y] sign_b = np.sign(amp_b[self.refRow]) if sign_b == sign_a or index_b - index_a < 4 or index_b - index_a > 40: continue if not self.compareAmps(amp_a, amp_b, self.validityPerc): continue lft, rght = self.findRelativeSlopeChanges(index_a) otherLft, otherRght = self.findRelativeSlopeChanges(index_b) if rght == otherLft: return False innerSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(rght, otherLft)] leftSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(lft-5, lft)] rightSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(otherRght, otherRght+5)] leftAvg = np.mean(np.abs(leftSlopes)) rightAvg = np.mean(np.abs(rightSlopes)) # Inner slopes are very small, non-ringing if np.mean(np.abs(innerSlopes)) < thresh: count += 1 break # Both edges have small slopes, not part of ringing set elif leftAvg < thresh and rightAvg < thresh: continue # One edge has ringing while another does not elif leftAvg < thresh or rightAvg < thresh: count += 1 break if count > 0: return False return True def checkStartEndForSpike(self, uniqLst): start, stop = uniqLst[0], uniqLst[-1] lft, rght = self.findSlopeChanges(start, self.data[self.refRow]) if self.findSpikeInRange(lft-10, rght+10): return True lft, rght = self.findSlopeChanges(stop, self.data[self.refRow]) if self.findSpikeInRange(lft-10, rght+10): return True return False def findShiftStartEnd(self, lft, rght, dta): # Estimate the region where the potential shift is at its max # and the outer edges where the shift starts/ends by looking for points # where the avg slope levels off and starts varying highly again win = self.ringEdgeWin a, b = lft-win, rght+win avgWin = 15 slopeBound = 0.1 diffDta = np.abs(np.diff(dta)) leftStart = lft for i in range(lft, a+avgWin, -1): if self.timeGapInRange(i-avgWin, i+avgWin): break avgSlope = np.mean(diffDta[i-avgWin:i+avgWin]) if self.withinEpsilon(avgSlope, 0, slopeBound): leftStart = i break lftInner = rght for i in range(rght, b-avgWin): if self.timeGapInRange(i-avgWin, i+avgWin): break avgSlope = np.mean(diffDta[i-avgWin:i+avgWin]) if self.withinEpsilon(avgSlope, 0, slopeBound): lftInner = i break # Direction of next search depends on whether the inner value is larger # or if the outer value is larger, representing the center/max level # of the shift leftStartVal = self.data[self.refRow][leftStart] lftInnerVal = self.data[self.refRow][lftInner] comp = lftInnerVal > leftStartVal if comp: rghtInner = lftInner for i in range(lftInner, b-avgWin): if self.timeGapInRange(i-avgWin, i+avgWin): break avgSlope = np.mean(diffDta[i-avgWin:i+avgWin]) if not self.withinEpsilon(avgSlope, 0, slopeBound): rghtInner = i break rightEnd = rghtInner for i in range(rghtInner, b-avgWin): if self.timeGapInRange(i-avgWin, i+avgWin): break avgSlope = np.mean(diffDta[i-avgWin:i+avgWin]) if self.withinEpsilon(avgSlope, 0, slopeBound): rightEnd = i break else: rghtInner = leftStart rightEnd = lftInner lftInner = rghtInner for i in range(rghtInner, a+avgWin, -1): if self.timeGapInRange(i-avgWin, i+avgWin): break avgSlope = np.mean(diffDta[i-avgWin:i+avgWin]) if not self.withinEpsilon(avgSlope, 0, slopeBound): lftInner = i break leftStart = lftInner for i in range(leftStart, a+avgWin, -1): if self.timeGapInRange(i-avgWin, i+avgWin): break avgSlope = np.mean(diffDta[i-avgWin:i+avgWin]) if self.withinEpsilon(avgSlope, 0, slopeBound): leftStart = i break return leftStart, rightEnd, lftInner, rghtInner # State update functions def ringUpdate(self, i, j, smoothDta=False): affectedGrps = [] for grp in self.ringGrps: lft, rght = grp[0], grp[-1] if len(set(np.arange(lft-self.ringEdgeWin, rght+self.ringEdgeWin)) & set(np.arange(i, j))) > 0: affectedGrps.append(grp) self.smoothRingingAmps(affectedGrps, self.t0, self.t1, smoothDta) def markCorrection(self, correctType, i, j): flagType = 'Found+Corrected' ringingCorrection = self.ringingInRange(i, j) if ringingCorrection: self.ringUpdate(i, j, True) flagType = 'Found+Full_Partial' self.parent.setQualityFlag((i, j), correctType, flagType) self.lastCorrection = (i, j) # Onset detection and amplitude calculation functions def checkIfOnset(self, arr, firstThresh=0.5, secThresh=0.3): # Checks if at least one value is greater than firstThresh # and another value is greater than secThresh res = False firstMet = False firstIndex = None for i in range(0, 3): if arr[i] > firstThresh: firstMet = True firstIndex = i break if firstMet: mask = arr > secThresh mask[firstIndex] = False if True in mask: return True return False def checkIfFilteredOnset(self, i, firstThresh, secThresh): # Criteria for detecting onsets in smoothed data amp = self.avgAmps[i] lftKey, rghtKey = self.filterKeyMap[i] smoothDta = self.filterDtaDict[self.filterKeyMap[i]] lft, rght = self.findSlopeChanges(i-lftKey+self.ringEdgeWin, smoothDta[self.refRow], self.ringSlopeThresh) win = self.avgWin leftAvg = self.getNearbyAvg(lft, 3, 'left', dta=smoothDta, ofst=0) rightAvg = self.getNearbyAvg(rght, 3, 'right', dta=smoothDta, ofst=0) avgAmp = rightAvg - leftAvg refLft, refRght = lft + lftKey - self.ringEdgeWin, rght + lftKey - self.ringEdgeWin leftSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(refLft-10, refLft)] rightSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(refRght, refRght+10)] leftSlopeAvg = np.mean(np.abs(leftSlopes)) rightSlopeAvg = np.mean(np.abs(rightSlopes)) # If slopes to left and right of the onset's start and end are small, # indicating that this is the sole onset nearby, then check if there # is a corresponding onset in the un-filtered data and update accordingly if leftSlopeAvg < 0.15 / self.res and rightSlopeAvg < 0.15 / self.res: newLft, newRght = self.findSlopeChanges(i, self.data[self.refRow]) leftVal, rightVal = self.data[:,newLft], self.data[:,newRght] slope = (rightVal[self.refRow] - leftVal[self.refRow]) / (self.times[newRght] - self.times[newLft]) valid = True if newRght - newLft > 6 and abs(slope) < 0.5 / self.res: valid = False if newRght - newLft > 4 and abs(slope) < 0.3 / self.res: valid = False if valid and self.vecWithinEpsilon(leftVal, leftAvg, self.validityPerc, 2) and self.vecWithinEpsilon(rightVal, rightAvg, self.validityPerc, 2): newAmp = rightVal - leftVal if self.checkIfOnset(np.abs(newAmp), firstThresh, secThresh): newAmp = self.getNearbyAvg(newRght, 3, 'right') - self.getNearbyAvg(newLft, 3, 'left') for index in range(newLft, newRght): if index in self.avgAmps: self.avgAmps[index] = newAmp self.adjustedAmps.append(index) for index in range(newLft - 5, newLft): if index in self.avgAmps: self.avgAmps[index] = np.zeros(3) self.adjustedAmps.append(index) for index in range(newRght, newRght+5): if index in self.avgAmps: self.avgAmps[index] = np.zeros(3) self.adjustedAmps.append(index) if i >= newLft and i < newRght: return True return False # Make sure both filtered data and underlying unfiltered data exhibits onset property offsetVal = i-lftKey+self.ringEdgeWin diffVec = smoothDta[:,offsetVal+1] - smoothDta[:,offsetVal-1] otherDiffVec = self.getDiffVec(i) if not self.checkIfOnset(np.abs(diffVec), firstThresh, secThresh) and not self.checkIfOnset(np.abs(otherDiffVec), firstThresh, secThresh): return False # Make sure averaged amp and regular amp are somewhat within range of each other if not self.vecWithinPerc(amp, avgAmp, 0.5, 2): return False diffVec = smoothDta[:,i-lftKey+self.ringEdgeWin+1] - smoothDta[:,i-lftKey+self.ringEdgeWin-1] # Make sure avgAmp satisfies onset property if self.checkIfOnset(np.abs(avgAmp), firstThresh, secThresh) and self.checkIfOnset(np.abs(diffVec), firstThresh, secThresh) and self.checkIfOnset(np.abs(amp), firstThresh, secThresh): return True elif self.checkIfOnset(np.abs(amp), firstThresh, secThresh): # Otherwise if amp satisfies onset property and there is an index along # the onset's slope that has a similar amplitude in the unfiltered data # yield this index tempIndex = self.adjustIndex(i) if i != tempIndex and i not in self.ringSet: lft, rght = self.findRelativeSlopeChanges(i) leftAvg = self.getNearbyAvg(lft, 3, 'left', ofst=0) rightAvg = self.getNearbyAvg(rght, 3, 'right', ofst=0) if self.vecWithinPerc(amp, rightAvg-leftAvg, self.ringAmpCompPerc, 2): return True return False def findOnsetsInWindow(self, sI, eI, firstThresh, secThresh, breakAtTimeGaps=True): maxIndex = min(self.dtaLen - 1, eI) for i in range(sI, maxIndex): if self.inMLSTSkipRange(i) or self.checkIfInSkipSet(i) or i in self.natSet: continue # Skip onsets near ringing where the amp is not similar to # the amplitude calculated from averages if i in self.filterKeyMap: if self.checkIfFilteredOnset(i, firstThresh, secThresh): yield i continue vec = self.getDiffVec(i) if self.checkIfOnset(np.abs(vec), firstThresh, secThresh): yield i def _calcAvgAmp(self, index, avgWin, smoothDta=None): if smoothDta is not None and index in self.filterKeyMap: lftBase, rghtBase = self.filterKeyMap[index] refIndex = index - lftBase + self.ringEdgeWin preAvg = self.getNearbyAvg(refIndex, avgWin, 'left', ofst=2, dta=smoothDta) postAvg = self.getNearbyAvg(refIndex, avgWin, 'right', ofst=2, dta=smoothDta) return postAvg-preAvg # Calculate the amplitude as the difference between the averages # to the left and right of the index lftStrt, lftEnd = self.getWindow(index, avgWin, 'left', ofst=2) preAvg = np.mean(self.data[:,lftStrt:lftEnd+1], axis=1) rghtStrt, rghtEnd = self.getWindow(index, avgWin, 'right', ofst=2) postAvg = np.mean(self.data[:,rghtStrt:rghtEnd+1], axis=1) amp = postAvg - preAvg return amp def calcAmplitude(self, index, avgWin, forceTrue=False): if index in self.avgAmps.keys() and not forceTrue: return self.avgAmps[index] # Calculate amplitude as the difference between the values to the # left and right of index where the slope changes sign newAmp = [] maxRow = 1 dta = self.data[maxRow] sI, eI = self.findSlopeChanges(index, dta) if sI == eI: # If index represents a peak, use the right value diff eI = index + 1 for row in range(0, 3): # Get the value difference between the next/previous peaks dta = self.data[row] valDiff = dta[eI] - dta[sI] # Amplitude calculated from right to left newAmp.append(valDiff) # Otherwise use the value difference return np.array(newAmp) def compareAmps(self, startAmp, endAmp, perc): # If amplitudes are similar enough by perc (or are both very small) # and the signs are different for at least 2 rows, return True validItems = 0 for row in range(0, 3): startVal, endVal = startAmp[row], endAmp[row] validVal = self.withinPerc(np.abs(startVal), np.abs(endVal), perc) if abs(startVal) < self.minAmp and abs(endVal) < self.minAmp: validVal = True validSign = np.sign(startVal) != np.sign(endVal) if validVal and validSign: validItems += 1 if validItems >= 2: return True return False # Other important helper functions def getWindow(self, index, window, direction='left', ofst=2, dta=None): if dta is None: dta = self.data # Get start/stop indices for left/right windows relative to an index if direction == 'left': # Bound left indices by 0 start = max(0, index-window-ofst) stop = max(0, index-ofst) return start, stop else: # Bound right indices by length of data start = min(len(dta[0])-1, index+ofst) stop = min(len(dta[0])-1, index+window+ofst) return start, stop def getUniqLst(self, onsets): # Filters out onsets that have unique amplitudes since some # points along the same slope will have the same amplitude compLst = [] uniqLst = [onsets[0]] prevOnset = onsets[0] prevAmp = self.calcAmplitude(prevOnset, self.avgWin) ampLst = [prevAmp] for i in range(1, len(onsets)): currOnset = onsets[i] currAmp = self.calcAmplitude(currOnset, self.avgWin) if currOnset-prevOnset > 10 or not self.vecWithinPerc(currAmp, prevAmp, self.compPerc, 2): uniqLst.append(currOnset) ampLst.append(currAmp) prevAmp = currAmp prevOnset = currOnset return uniqLst, ampLst def getNearbyAvg(self, index, window, direction, ofst=2, dta=None): # Get average value of previous/next points in dta by the given window, # offset, and direction relative to the given index if dta is None: dta = self.data lft, rght = self.getWindow(index, window, direction, ofst=ofst, dta=dta) if rght - lft <= 1: avg = dta[:,index] else: avg = np.mean(dta[:,lft:rght], axis=1) return avg def getEdgeAvgs(self, index, window, ofst=2, dta=None): # Get average values to left and right of index by the given window leftAvg = self.getNearbyAvg(index, window, 'left', ofst=ofst, dta=dta) rightAvg = self.getNearbyAvg(index, window, 'right', ofst=ofst, dta=dta) return leftAvg, rightAvg def adjustIndex(self, index): if index in self.avgAmps and index not in self.adjustedAmps: # Look for an index w/ an amplitude close to the smoothed amplitude amp = self.avgAmps[index] startVal = None endVal = None for subIndex in range(index, index+5): subAmp = self.calcAmplitude(subIndex, self.avgWin, True) if self.vecWithinPerc(subAmp, amp, self.ringAmpCompPerc, 2) and (startVal is None or endVal == subIndex-1): if startVal is None: startVal = subIndex endVal = subIndex if endVal is not None: if endVal-startVal > 1: return int((startVal+endVal)/2) else: return endVal # Adjust indices to be along center of slope for onsets/offsets lft, rght = self.findRelativeSlopeChanges(index) if abs(lft-rght) > 1: return int((lft+rght)/2) return index def findRelativeSlopeChanges(self, index, epsilon=None): if index in self.filterKeyMap and index not in self.adjustedAmps: refBase, refEdge = self.filterKeyMap[index] refIndex = index - refBase + self.ringEdgeWin refOfst = refBase - self.ringEdgeWin smoothDta = self.filterDtaDict[self.filterKeyMap[index]] lft, rght = self.findSlopeChanges(refIndex, smoothDta[self.refRow], epsilon) return lft+refOfst, rght + refOfst return self.findSlopeChanges(index, self.data[self.refRow], epsilon) def getVectorAvg(self, i, j, dta=None): if dta is None: dta = self.data # Bounds checking if i < 0: i = 0 j = max(1, j) elif j >= len(dta[0]): i = len(dta[0]) - 1 return np.mean(self.data[:,i:j+1], axis=1) def findSlopeChanges(self, index, dta, epsilon=None): if epsilon is None: epsilon = self.slopeThresh # Finds points where slope changes sign before/after index prevSlope = self.getSlope(index, dta) prevIndex = index lastSlope = prevSlope while prevIndex >= 0 and np.sign(self.getSlope(prevIndex, dta)) == np.sign(prevSlope): if abs(self.getSlope(prevIndex, dta)) < epsilon: break lastSlope = self.getSlope(prevIndex, dta) prevIndex -= 1 if index + 1 >= len(dta): return prevIndex, index nextSlope = self.getSlope(index+1, dta) if np.sign(self.getSlope(index, dta)) != np.sign(nextSlope): prevIndex = index nextIndex = index + 1 lastSlope = nextSlope while nextIndex < len(dta) and np.sign(self.getSlope(nextIndex, dta)) == np.sign(nextSlope): if abs(self.getSlope(nextIndex, dta)) < epsilon: break lastSlope = self.getSlope(nextIndex, dta) nextIndex += 1 if prevIndex == index and nextIndex == index + 1: return index, index + 1 nextIndex = nextIndex - 1 # Adjust for offset return prevIndex, nextIndex def interpolateValues(self, i, j): interpTimes = self.times[i:j+1] lftVal = self.data[:,i] rghtVal = self.data[:,j] valStack = [] for row in range(0, 3): interpDta = np.interp(interpTimes, [interpTimes[0], interpTimes[-1]], [lftVal[row], rghtVal[row]]) valStack.append(interpDta) res = np.stack(valStack) return res def interpolateEdge(self, index): # Linearly interpolate between nearest points before and after # index where slope changes sign lft, rght = self.findSlopeChanges(index, self.data[1]) # Spike-type onsets if lft == rght: lft, rght_a = self.findSlopeChanges(index-1, self.data[1]) lft_a, rght = self.findSlopeChanges(index+1, self.data[1]) self.data[:,lft:rght+1] = self.interpolateValues(lft, rght) # Correction algorithms and helper functions def correctSpike(self, a, b): # Get left and right averages and use them to fill the corresponding # edges of the spike lftAvg = self.getNearbyAvg(a, self.avgWin, 'left') rghtAvg = self.getNearbyAvg(b, self.avgWin, 'right') lft, rght = self.findSlopeChanges(a, self.data[self.refRow]) otherLft, otherRght = self.findSlopeChanges(b, self.data[self.refRow]) self.fillRangeWithVec(lft, rght, lftAvg) self.fillRangeWithVec(rght, otherRght, rghtAvg) def correctSingleShift(self, start, stop, startAmp, endAmp, avgWindow=5, smoothDta=None): start = self.adjustIndex(start) stop = self.adjustIndex(stop) # if ssFlag: # plt.clf() # plt.plot(self.times[start-100:stop+100], self.data[1][start-100:stop+100]) # plt.plot(self.times[[start, stop]], self.data[1][[start,stop]], 'o') # if smoothDta is not None: # lft, rght = self.filterKeyMap[start] # plt.plot(self.times[lft-self.ringEdgeWin:rght+self.ringEdgeWin], smoothDta[1]) minEditedIndex = start maxEditedIndex = stop # Set values near shift start/stop indices to nearby averages for index, amp in [(start, startAmp), (stop, endAmp*(-1))]: # # Get left averaging window avgStart, avgEnd = self.getWindow(index, avgWindow, 'left') sigShift = None if index == stop and not self.ringingInRange(index-4, index+4): innerShifts = list(self.findOnsetsInWindow(avgStart, avgEnd, self.t0, self.t1)) for shift in innerShifts[::-1]: innerAmp = self.calcAmplitude(shift, avgWindow) count = 0 for row in range(0, 3): if abs(innerAmp[row]) > 2 * abs(amp[row]): count += 1 if count >= 2: sigShift = shift break if sigShift is not None: avgStart = sigShift avgVals = np.mean(self.data[:,avgStart:avgEnd+1], axis=1) avgVals = np.reshape(avgVals, (3, 1)) # Reshapes avg into a column # # Repeat steps above for right averages avgStart, avgEnd = self.getWindow(index+1, avgWindow, 'right') sigShift = None if index == start: innerShifts = self.findOnsetsInWindow(avgStart, avgEnd, self.t0, self.t1) for shift in innerShifts: innerAmp = self.calcAmplitude(shift, self.avgWin) count = 0 for row in range(0, 3): if abs(innerAmp[row]) > 2 * abs(amp[row]): count += 1 if count >= 2: sigShift = shift break if sigShift is not None: avgEnd = sigShift if avgEnd <= avgStart: avgStart = index - 2 avgValsE = np.mean(self.data[:,avgStart:avgEnd+1], axis=1) avgValsE = np.reshape(avgValsE, (3, 1)) eiO, eiE = self.getWindow(index+1, avgWindow+2, 'right', ofst=0) if sigShift is not None: eiE = sigShift if self.ringingInRange(eiO, eiE): eiO, eiE = self.getWindow(index, 2, 'right', ofst=0) # Adjust amps in final step lft, rght = self.findRelativeSlopeChanges(index) leftAvg = self.getNearbyAvg(lft, avgWindow, 'left') rightAvg = self.getNearbyAvg(rght, avgWindow, 'right') diff = np.reshape(rightAvg - leftAvg, 3) for row in range(0, 3): if (self.withinPerc(diff[row], amp[row], self.compPerc) or self.ringingInRange(eiO, eiE)): if index == start: startAmp[row] = diff[row] elif index == stop: endAmp[row] = diff[row] * (-1) a, b = self.smoothEdge(index, avgWindow, smoothDta=smoothDta) minEditedIndex = min(a, minEditedIndex) maxEditedIndex = max(b, maxEditedIndex) # if ssFlag: # plt.plot(self.times[start-100:stop+100], self.data[1][start-100:stop+100]) # Do a linear interpolation across times between start and stop # with the start/stop data values being set to startAmp and endAmp, # respectively startTime, stopTime = self.times[start], self.times[stop] interpTimes = self.times[start+1:stop+1] for row in range(0, 3): startVal = startAmp[row] endVal = endAmp[row] linAmp = np.interp(interpTimes, [startTime, stopTime], [startVal, endVal]) tempVal = self.data[row][start+1] adjustedLinAmp = linAmp + tempVal dta = self.data[row][start+1:stop+1] self.data[row][start+1:stop+1] = dta - linAmp # if ssFlag: # plt.plot(self.times[start-100:stop+100], self.data[1][start-100:stop+100]) # plt.show() self.markCorrection('Shift', minEditedIndex, maxEditedIndex) def correctCompound(self, indexLst, ampLst=[], smoothDta=None): self.ringUpdate(indexLst[0], indexLst[-1]) indexLst.sort() # First correct any spike shifts accordingly and remove them from the list rmvLst = [] newLst = indexLst[:] for z in range(1, len(indexLst)): i = indexLst[z-1] j = indexLst[z] if self.ringingInRange(i-50, j+50): continue if i in self.ringSet or j in self.ringSet: continue amp_i = self.calcAmplitude(i, self.avgWin) amp_j = self.calcAmplitude(j, self.avgWin) lftAvg = self.getNearbyAvg(i, self.avgWin, 'left') rghtAvg = self.getNearbyAvg(j, self.avgWin, 'right') if self.checkForSpikeShift(i, j, amp_i, amp_j): self.correctSpike(i, j) rmvLst.append(i) rmvLst.append(j) for z in rmvLst: if z in newLst: newLst.remove(z) if len(newLst) <= 1: if len(newLst) > 0: self.markCorrection('Compound', indexLst[0], indexLst[-1]) return newLst = [self.adjustIndex(index) for index in indexLst] indexLst = newLst # Create a baseline to calculate amplitudes relative to, as each partial # shift correction is completed start, stop = indexLst[0], indexLst[-1] sI = start eI = stop initVal, lastVal = self.data[:,start], self.data[:,stop] startTime, stopTime = self.times[start], self.times[stop] interpTimes = self.times[start:stop+1] ampStack = [] for row in range(0, 3): refLft, refRght = self.findRelativeSlopeChanges(start) initVal = self.getNearbyAvg(refLft, self.avgWin, 'left') linAmp = [initVal[row]] * (len(interpTimes)) ampStack.append(linAmp) linAmp = np.stack(ampStack) # Adjust baseline so small natural changes in slopes are maintained self.fitSlopes(indexLst, linAmp, smoothDta=smoothDta) # Use a regular linearly interpolated linAmp for small ringing shift corrections refLft, refRght = self.findRelativeSlopeChanges(start) otherLft, refRght = self.findRelativeSlopeChanges(stop) leftAvg = self.getNearbyAvg(refLft, self.avgWin, 'left', ofst=0) rightAvg = self.getNearbyAvg(refRght, self.avgWin, 'right', ofst=0) if self.vecWithinEpsilon(leftAvg, rightAvg, 2, 2) and stop-start < 400: linAmp = [] for row in range(0, 3): interpAmp = np.interp(interpTimes, [interpTimes[0], interpTimes[-1]], [leftAvg[row], rightAvg[row]]) linAmp.append(interpAmp) linAmp = np.stack(linAmp) # Start keeping track of changed indices for quality flag marking purposes minEditedIndex = indexLst[0] maxEditedIndex = indexLst[-1] # Correct indices that are very close together by linearly interpolating # between the left endpoint of the starting index and the right endpoint # of the ending index and remove the smaller index from the onset list baseAmps = [] changedIndices = set() indexLst.sort() for z in range(1, len(indexLst)): sI = indexLst[z-1] eI = indexLst[z] if eI - sI <= 7 and not self.ringingInRange(sI, eI): amp_s = self.calcAmplitude(sI, self.avgWin) amp_e = self.calcAmplitude(eI, self.avgWin) if self.vecNorm(amp_s) < self.vecNorm(amp_e) + 0.1: changedIndices.add(sI) else: changedIndices.add(eI) lft1, rght1 = self.findSlopeChanges(sI, self.data[self.refRow]) lft2, rght2 = self.findSlopeChanges(eI, self.data[self.refRow]) subInterpTimes = self.times[lft1:rght2+1] for row in range(0, 3): avg = np.interp(subInterpTimes, [subInterpTimes[0], subInterpTimes[-1]], [self.data[row][lft1], self.data[row][rght2]]) self.data[row][lft1:rght2+1] = avg self.ringUpdate(sI, eI) for subIndex in changedIndices: if subIndex in indexLst: indexLst.remove(subIndex) for z in range(1, len(indexLst)): sI = indexLst[z-1] eI = indexLst[z] sAmp, eAmp, win = self.getBaselineAmps(sI, eI, start, linAmp) baseAmps.append((sAmp, eAmp)) # For each pair of indices sufficiently spaced apart, smooth the right # edge and use their respective baseline amps (linearly interpolated # between each pair for each row) to shift the section of data to # the baseline, updating the baselineAmps as needed if the smoothing # operation might have affected values for another pair of indices prevSmoothed = False spikeStatuses = {} lastStartRange = None lastRange = None lastChanged = None for z in range(1, len(indexLst)): sI = indexLst[z-1] eI = indexLst[z] sAmp, eAmp = baseAmps[z-1] # if cpFlag: # plt.clf() # plt.plot(self.times[start-100:stop+100], self.data[1][start-100:stop+100]) # plt.plot(self.times[[sI, eI]], self.data[1][[sI, eI]], 'o') if eI - sI <= 7: continue # If last smooth op affected nearby values, update baseline amps if lastRange is not None: a, b = lastRange if b <= eI: sAmp, eAmp, win = self.getBaselineAmps(sI, eI, start, linAmp) win = self.avgWin if eI != indexLst[-1] and indexLst[z+1] - eI <= 10: win = max(indexLst[z+1] - eI - 2, 1) # Smooth right edge and store the tuple of indices that were changed if prevSmoothed == False: # First edge needs to be smoothed spikeStatuses[sI] = self.checkSpikeLikeStatus(sI) prevRange = self.smoothEdge(sI, 5, smoothDta=smoothDta) prevSmoothed = True lastChanged = sI if eI - sI <= 7: lastRange = prevRange sAmp, eAmp, win = self.getBaselineAmps(sI, eI, start, linAmp) else: lastStartRange = prevRange spikeStatuses[eI] = self.checkSpikeLikeStatus(eI) lastRange = self.smoothEdge(eI, win, smoothDta=smoothDta) else: lastStartRange = lastRange spikeStatuses[eI] = self.checkSpikeLikeStatus(eI) lastRange = self.smoothEdge(eI, win, smoothDta=smoothDta) # Adjust endpoints for quality flag marking a, b = lastRange minEditedIndex = min(minEditedIndex, a) maxEditedIndex = max(maxEditedIndex, b) sI = lastChanged # if cpFlag: # plt.plot(self.times[start-100:stop+100], self.data[1][start-100:stop+100]) # Interpolate between starting/ending baseline amps and subtract # from data interpTimes = self.times[sI+1:eI+1] interpStack = [np.interp(interpTimes, self.times[[sI, eI]], [sAmp[row], eAmp[row]]) for row in range(0, 3)] interpStack = np.stack(interpStack) self.data[:,sI+1:eI+1] = self.data[:,sI+1:eI+1] - interpStack # Correct spikes for ringing corrections self.clampEdge(sI, spikeStatuses[sI], lastStartRange) lastChanged = eI # if cpFlag: # if indexLst[-1] in self.filterKeyMap or indexLst[0] in self.filterKeyMap: # if indexLst[-1] in self.filterKeyMap: # keyLeft, keyRight = self.filterKeyMap[indexLst[-1]] # else: # keyLeft, keyRight = self.filterKeyMap[indexLst[0]] # tempDta = self.filterDtaDict[(keyLeft, keyRight)] # plt.plot(self.times[keyLeft-self.ringEdgeWin:keyRight+self.ringEdgeWin], tempDta[1]) # plt.plot(self.times[start-100:stop+100], self.data[1][start-100:stop+100]) # plt.plot(self.times[sI+1:eI+1], linAmp[1][sI-indexLst[0]+1:eI-indexLst[0]+1]) # plt.show() # Update stored filtered data self.ringUpdate(sI-5, eI+5) # Update data quality flag and avgAmps dictionary if len(spikeStatuses) != 0: eI = max(spikeStatuses.keys()) self.clampEdge(eI, spikeStatuses[eI], lastRange) self.markCorrection('Compound', minEditedIndex, maxEditedIndex) def fitSlopes(self, indexLst, linAmp, smoothDta=None): # Get the slopes along the individual curves in the shift and # match them along the baseline, resetting the baseline after each # adjustment start = indexLst[0] stop = indexLst[-1] fullStart, fullStop = start, stop rightInnerAvg = None for i in range(1, len(indexLst)): stop = indexLst[i] if stop-start <= 7: start = stop continue lft, rght = self.findRelativeSlopeChanges(start) lftStop, rghtStop = self.findRelativeSlopeChanges(stop) win = min(self.avgWin, max(lftStop-rght-2, 3)) leftInnerAvg = self.getNearbyAvg(rght, win, 'right', ofst=2) rightInnerAvg = self.getNearbyAvg(lftStop, win, 'left', ofst=2) if self.ringingInRange(rght, lftStop) and lftStop - rght < 15: avgVal = (leftInnerAvg + rightInnerAvg) / 2 leftInnerAvg = avgVal rightInnerAvg = avgVal intrpTimes = self.times[start:stop+1] diff = rightInnerAvg - leftInnerAvg # Interpolate from 0 to the difference along the line segment for row in range(0, 3): interpSlope = np.interp(intrpTimes, [intrpTimes[0], intrpTimes[-1]], [0, diff[row]]) # Add in slope difference and set new baseline linAmp[row][start-fullStart+1:stop-fullStart+1] = linAmp[row][start-fullStart+1:stop-fullStart+1] + interpSlope[1:] numPts = fullStop-fullStart-(stop-fullStart) linAmp[row][stop-fullStart+1:] += diff[row] # if linAmpFlag: # plt.clf() # plt.plot(self.times[fullStart:fullStop+1], linAmp[1]) # plt.plot(self.times[[start, stop]], self.data[1][[start, stop]], 'o') # plt.plot(self.times[[lft, rght]], self.data[1][[lft, rght]]) # plt.plot(self.times[[lftStop, rghtStop]], self.data[1][[lftStop, rghtStop]]) # plt.plot(self.times[fullStart-100:fullStop+100], self.data[1][fullStart-100:fullStop+100]) # plt.show() start = stop # Final adjustment to make sure ending value of linAmp is close to # nearby right avg if rightInnerAvg is not None: lft, rght = self.findRelativeSlopeChanges(indexLst[-1]) rightInnerAvg = self.getNearbyAvg(rght, self.avgWin, 'right', ofst=0) endLinAmp = np.mean(linAmp[:,len(linAmp)-5:], axis=1) for row in range(0, 3): diff = rightInnerAvg[row] - endLinAmp[row] interpTimes = self.times[fullStart:fullStop+1] interpDiff = np.interp(interpTimes, [interpTimes[0], interpTimes[-1]], [0, diff]) linAmp[row] += interpDiff # if linAmpFlag: # plt.clf() # plt.plot(self.times[fullStart:fullStop+1], linAmp[1]) # plt.plot(self.times[[start, stop]], self.data[1][[start, stop]], 'o') # plt.plot(self.times[[lft, rght]], self.data[1][[lft, rght]]) # plt.plot(self.times[[lftStop, rghtStop]], self.data[1][[lftStop, rghtStop]]) # plt.plot(self.times[fullStart-100:fullStop+100], self.data[1][fullStart-100:fullStop+100]) # plt.show() def getBaselineAmps(self, sI, eI, s0, linAmp): # Gets the inner left and inner right amplitudes at the indices sI and eI # relative to the corresponding linAmp values (offsetted by s0) lft, rght = self.findRelativeSlopeChanges(sI) oldLft, oldRght = lft, rght lft, rght = self.findRelativeSlopeChanges(eI) if lft - oldRght <= 7: win = max(int((lft-oldRght)/2), 1) else: win = self.avgWin innerLeft = self.getNearbyAvg(oldRght, win, 'right', ofst=2) innerRight = self.getNearbyAvg(lft, win, 'left', ofst=2) if self.ringingInRange(oldRght, lft) and lft - oldRght < 15: avgVal = (innerLeft + innerRight) / 2 innerLeft = avgVal innerRight = avgVal baseInnerLeft = self.getNearbyAvg(sI-s0, win, 'right', dta=linAmp) baseInnerRight = self.getNearbyAvg(eI-s0, win, 'left', dta=linAmp) sAmp = innerLeft - baseInnerLeft eAmp = innerRight - baseInnerRight return sAmp, eAmp, win def fillRangeWithVec(self, i, j, vec): vec = np.reshape(vec, (3,1)) numPts = j - i + 1 self.data[:,i:j+1] = np.tile(vec, numPts) def smoothEdge(self, index, window, smoothDta=None): # Function used to smooth data to left/right of shift before the values # are shifted down amp = self.calcAmplitude(index, self.avgWin) left_a, left_b = self.getWindow(index, window, 'left') right_a, right_b = self.getWindow(index, window, 'right') lft, rght = self.findRelativeSlopeChanges(index) if lft != rght: left_a, left_b = self.getWindow(lft, window, 'left', ofst=2) right_a, right_b = self.getWindow(rght, window, 'right', ofst=2) # Extend window if slope is composed of more points addLeft = index-lft-1 if abs(rght-lft) > 1 else 0 addRight = rght-index-1 if abs(rght-lft) > 1 else 0 if addLeft < window: addLeft = 0 if addRight < window: addRight = 0 leftReasonable = not self.ringingInRange(left_a, lft) rightReasonable = not self.ringingInRange(rght, right_b) sigShiftLeft = None sigShiftRight = None # Check for significant shift on left edge innerShifts = list(self.findOnsetsInWindow(left_a, left_b, self.t0, self.t1)) for shift in innerShifts[::-1]: innerAmp = self.calcAmplitude(shift, self.avgWin) count = 0 for row in range(0, 3): if abs(innerAmp[row]) > 1.25 * abs(amp[row]): count += 1 if count >= 2 and not self.compareAmps(amp, innerAmp, self.compPerc): sigShiftLeft = shift break # Check for significant shift on right edge innerShifts = self.findOnsetsInWindow(right_a, right_b, self.t0, self.t1) for shift in innerShifts: innerAmp = self.calcAmplitude(shift, self.avgWin) count = 0 for row in range(0, 3): if abs(innerAmp[row]) > 1.25 * abs(amp[row]): count += 1 if count >= 2: sigShiftRight = shift break # Smooth both sides, bounded by sig shifts if both are reasonable if leftReasonable and rightReasonable: if sigShiftLeft is not None: left_a = sigShiftLeft if left_b <= left_a: left_a = sigShiftLeft left_b = index if sigShiftRight is not None: right_b = sigShiftRight if right_b <= right_a: right_b = index+1 right_a = sigShiftRight lf_a, lf_b = self.getWindow(index, window+addLeft, 'left', ofst=0) rf_a, rf_b = self.getWindow(index+1, window+addRight, 'right', ofst=0) if sigShiftLeft is not None: lf_b = sigShiftLeft if lf_b <= lf_a: lf_a = sigShiftLeft lf_b = index if sigShiftRight is not None: rf_b = sigShiftRight if rf_b <= rf_a: rf_b = index + 1 rf_a = sigShiftRight # Interpolate for small ranges if abs(rf_b - lf_a) < window: self.data[:,lf_a:rf_b+1] = self.interpolateValues(lf_a, rf_b) else: # Regular fill/average otherwise if left_b - left_a <= 1: leftAvg = self.data[:,left_a] else: leftAvg = np.mean(self.data[:,left_a:left_b+1], axis=1) if right_b - right_a <= 1: rightAvg = self.data[:,right_b] else: rightAvg = np.mean(self.data[:,right_a:right_b+1], axis=1) self.fillRangeWithVec(lf_a, lf_b, leftAvg) self.fillRangeWithVec(rf_a, rf_b, rightAvg) # if cpFlag: # plt.plot(self.times[[left_a, left_b]], self.data[1][[left_a, left_b]], 'o') # plt.plot(self.times[[right_a, right_b]], self.data[1][[right_a, right_b]], 'o') return lf_a, rf_b elif leftReasonable: if sigShiftLeft is not None: left_a = sigShiftLeft left_b = index lf_a, lf_b = self.getWindow(index, window+addLeft, 'left', ofst=0) if sigShiftLeft is not None: lf_b = sigShiftLeft if lf_b <= lf_a: lf_a = sigShiftLeft lf_b = index if abs(lf_b - lf_a) < window: self.data[:,lf_a:lf_b+1] = self.interpolateValues(lf_a, lf_b) else: leftAvg = np.mean(self.data[:,left_a:left_b+1], axis=1) self.fillRangeWithVec(lf_a, lf_b, leftAvg) self.fillRangeWithVec(lf_b+1, index, leftAvg + amp) # if cpFlag: # plt.plot(self.times[[left_a, left_b]], self.data[1][[left_a, left_b]], 'o') return lf_a, lf_b elif rightReasonable: if sigShiftRight is not None: right_a = index + 1 right_b = sigShiftRight rf_a, rf_b = self.getWindow(index+1, window+addRight, 'right', ofst=0) if sigShiftRight is not None: rf_b = sigShiftRight if rf_b <= rf_a: rf_b = index + 1 rf_a = sigShiftRight if abs(rf_b-rf_a) < window: self.data[:,rf_a:rf_b+1] = self.interpolateValues(rf_a, rf_b) else: rightAvg = np.mean(self.data[:,right_a:right_b+1], axis=1) self.fillRangeWithVec(rf_a, rf_b, rightAvg) self.interpolateEdge(index) # if cpFlag: # plt.plot(self.times[[left_a, left_b]], self.data[1][[left_a, left_b]], 'o') return rf_a, rf_b else: self.interpolateEdge(index) return index-1, index + 1 def checkSpikeLikeStatus(self, index): # Adjust indices marking left edge and right edge for potential spikes # based on the slope sign prevSlope = self.getSlope(index, self.data[self.refRow]) nextSlope = self.getSlope(index+1, self.data[self.refRow]) prevSign = np.sign(prevSlope) nextSign = np.sign(nextSlope) if prevSign == nextSign: lft, rght = self.findSlopeChanges(index, self.data[self.refRow]) otherLft, otherRght = self.findSlopeChanges(index+1, self.data[self.refRow]) else: lft, rght = self.findSlopeChanges(index-1, self.data[self.refRow]) otherLft, otherRght = self.findSlopeChanges(index, self.data[self.refRow]) if np.sign(prevSlope) == 0: otherLft, otherRght = self.findSlopeChanges(index, self.data[self.refRow]) otherLft, otherRght = self.findSlopeChanges(otherRght, self.data[self.refRow]) # Find the left/right nearby avgs and amps amp_lft = self.data[:,otherLft] - self.data[:,lft] amp_rght = self.data[:,otherRght] - self.data[:,otherLft] rightAvg = self.getNearbyAvg(rght, self.avgWin, 'right') leftAvg = self.getNearbyAvg(lft, self.avgWin, 'left') leftVal = self.data[:,lft] rightVal = self.data[:,rght] # Use the maximum val from the left/right amps to compute a # bounding amplitude value for potential left/right nearby spike-like artifacts maxVals = [[max([abs(amp_lft[row]), abs(amp_rght[row])])] for row in range(0, 3)] minVec = np.stack(maxVals) minVec = minVec * 0.5 lft_a, rght_a = self.findSlopeChanges(lft-1, self.data[self.refRow]) leftLeftAmp = self.data[:,rght_a] - self.data[:,lft_a] lft_b, rght_b = self.findSlopeChanges(otherRght, self.data[self.refRow]) rightRightAmp = self.data[:,rght_b] - self.data[:,lft_b] # For each row, check if the spike's amplitudes are large and if the # data pts immediately to the left/right of it have small amplitudes statusVec = [False] * 3 for row in range(0, 3): largeAmp = (np.abs(amp_lft[row]) > 1 or np.abs(amp_rght[row]) > 1) smallSum = (abs(amp_rght[row] + amp_lft[row]) < minVec[row]) smallEdges = (abs(leftLeftAmp[row]) < minVec[row] and abs(rightRightAmp[row] < minVec[row])) if largeAmp and smallSum and smallEdges: statusVec[row] = True return statusVec def findIndexAboveBelowVal(self, indexRange, val, direction, row, lr): # indexRange = range to look at # val = value to serve as cutoff when looping through indexRange # row = component to look at # direction = whether the starting value is above or below 'val' # lr = direction to look at indexRange in, left to 'right' or right to 'left' a, b = indexRange stopIndex = a start, stop = a, b+1 step = 1 if lr == 'left': step = -1 start = b stop = a-1 if direction == 'above': for index in range(start, stop, step): if self.data[row][index] <= val: stopIndex = index break else: for index in range(start, stop, step): if self.data[row][index] >= val: stopIndex = index break return stopIndex def clampEdge(self, index, spikeStatus, smoothRange): if not self.ringingInRange(index-10, index+10): return a, b = smoothRange rightCorrection = a >= index - 1 leftCorrection = b <= index + 1 # Adjust index used for spike status if index falls along a point in a # potential spike that makes it hard to evaluate leftSlopeVec = [self.getSlope(index, self.data[row]) for row in range(0, 3)] if np.array_equal(leftSlopeVec, np.zeros(3)): # If point is along smoothed data (slope = 0) newStatus = self.checkSpikeLikeStatus(index+1) else: newStatus = self.checkSpikeLikeStatus(index) if rightCorrection: # Or if point is along the right edge of a spike if np.sign(leftSlopeVec[self.refRow]) > 0: lft, rght = self.findSlopeChanges(index, self.data[self.refRow]) if np.sign(self.getSlope(rght+1, self.data[self.refRow])) == 0: newStatus = self.checkSpikeLikeStatus(lft) # Interpolated values should not be corrected for spikes if leftCorrection and rightCorrection: return for row in range(0, 3): # Only continue if spike is seen where one wasn't previously seen if spikeStatus[row] == True or newStatus[row] == False: continue leftSlope = self.getSlope(index, self.data[row]) if rightCorrection: # If index along positive slope if np.sign(leftSlope) > 0: otherLft, otherRght = self.findSlopeChanges(index, self.data[row]) if np.sign(self.getSlope(otherRght+1, self.data[row])) == 0: # Zero right slope -> downward spike lft, rght = self.findSlopeChanges(otherLft-1, self.data[row]) leftSlope = self.getSlope(otherLft, self.data[row]) else: # Upward spike lft, rght = otherLft, otherRght otherLft, otherRght = self.findSlopeChanges(rght, self.data[row]) leftSlope = self.getSlope(rght, self.data[row]) else: # Beginning of a downward spike lft, rght = self.findSlopeChanges(index-1, self.data[row]) otherLft, otherRght = self.findSlopeChanges(rght, self.data[row]) if leftCorrection: if np.sign(leftSlope) == 0: # Start of spike lft, rght = self.findSlopeChanges(index, self.data[row]) otherLft, otherRght = self.findSlopeChanges(rght, self.data[row]) else: # Somewhere along spike lft, rght = self.findSlopeChanges(index-1, self.data[row]) otherLft, otherRght = self.findSlopeChanges(index, self.data[row]) leftAvg = self.getNearbyAvg(lft, self.avgWin, 'left') rightAvg = self.getNearbyAvg(otherRght, self.avgWin, 'right') if leftCorrection and np.sign(leftSlope) == 0: leftSlope = self.getSlope(index+1, self.data[row]) # For each correction type, find the point where the spike is above # or below the desired value and set the values between the spike's # start/end and that point to the appropriate average if leftCorrection: start = lft if np.sign(leftSlope) > 0: stop = self.findIndexAboveBelowVal((rght, otherRght), leftAvg[row], 'above', row, 'right') else: stop = self.findIndexAboveBelowVal((rght, otherRght), leftAvg[row], 'below', row, 'right') self.data[row][start:stop] = np.tile(leftAvg[row], stop-start) elif rightCorrection: stop = otherRght if np.sign(leftSlope) > 0: start = self.findIndexAboveBelowVal((lft, rght), rightAvg[row], 'above', row, 'left') else: start = self.findIndexAboveBelowVal((lft, rght), rightAvg[row], 'below', row, 'left') self.data[row][start:stop] = np.tile(rightAvg[row], stop-start) # Validity checking def compareHeights(self, onsetLst, amp_a, amp_b): # Gets the sum of the amplitudes of the onsets and checks to see # if there likely a large unresolved shift in between the starting # and ending index for each row (denoted by a large sumAmp significantly # large than the starting / ending amps avgAmp = (amp_a+amp_b)/2 uniqOnsets, uniqAmps = self.getUniqLst(onsetLst) if len(uniqOnsets) <= 2: return True elif len(uniqOnsets) <= 3: sumAmp = self.calcAmplitude(uniqOnsets[1], self.avgWin) else: sumAmp = sum(uniqAmps) count = 0 matchingRows = [] for row in range(0, 3): if abs(sumAmp[row]) > abs(avgAmp[row]*0.5): matchingRows.append(row) count += 1 # Additional checking to make sure a large unresolved shift in a single # component is not a result of the component having only very small changes # in value (e.g. < 1 nT) if count == 1: row = matchingRows[0] rowAmps = np.array([amp[row] for amp in uniqAmps]) if True not in (np.abs(rowAmps) > 0.5): count -= 1 elif abs(amp_a[row]) < 1 and abs(amp_b[row]) < 1 and abs(sumAmp[row]) < 0.5: count -= 1 if count >= 1: return False else: return True def checkCompHeights(self, onsets): epsilon = 1 # nT start, stop = onsets[0], onsets[-1] # First checks if left and start ending values (averaged) are very close # to each other, or if the difference between the inner averages # added to the starting value helps satisfy this condition leftAvg = self.getNearbyAvg(start, self.avgWin, 'left') rightAvg = self.getNearbyAvg(stop, self.avgWin, 'right') if self.vecWithinEpsilon(leftAvg, rightAvg, epsilon): return True leftInner = self.getNearbyAvg(start, self.avgWin, 'right') rightInner = self.getNearbyAvg(stop, self.avgWin, 'left') diff = rightInner - leftInner if self.vecWithinEpsilon(leftAvg + diff, rightAvg, epsilon): return True # Similar calculation to checkRowDiff, calculates difference between # inner averages and makes sure the difference between the start/end # values is close to the sum of the inner avg diffs for at least 2 components sumDiff = np.zeros(3) for i in range(1, len(onsets)): a, b = onsets[i-1], onsets[i] if b-a <= 4: continue innerLeft = self.getNearbyAvg(a, self.avgWin, 'right') innerRight = self.getNearbyAvg(b, self.avgWin, 'left') diff = innerRight - innerLeft sumDiff += diff edgeDiff = rightAvg - leftAvg if self.vecWithinPerc(sumDiff, edgeDiff, self.validityPerc, 2): return True return False def checkRowDiff(self, uniqLst, sumAmp, row): # Finds the inner averages between each pair of sufficiently spaced apart # indices, then sums up the differences between the left inner avg and # right inner avg and compares it to the sumAmp nearAvgWin = 3 diffTotal = 0 for z in range(1, len(uniqLst)): a, b = uniqLst[z-1], uniqLst[z] lft, rght = self.findRelativeSlopeChanges(a) otherLft, otherRight = self.findRelativeSlopeChanges(b) if b - a < 10: continue elif self.ringingInRange(lft+2, otherRight-2): continue leftInner = self.getNearbyAvg(a, nearAvgWin, 'right', ofst=0) rightInner = self.getNearbyAvg(b, nearAvgWin, 'left', ofst=0) diff = rightInner - leftInner diffTotal += diff[row] smallDiff = (abs(sumAmp[row]) > 1 and (diffTotal + sumAmp[row]) < self.t1) if self.withinPerc(diffTotal, sumAmp[row]*(-1), self.compPerc) or smallDiff: return True return False def checkForLargeGlitch(self, uniqLst): # Check if in range where tooth-like spike appears mlst_a, mlst_b = 7.75, 8.5 ampThresh = 4.5 a, b = uniqLst[0], uniqLst[-1] if (self.MLST[a] < mlst_a or self.MLST[a] > mlst_b) and self.MLST[b] < mlst_a: return False if self.MLST[a] > mlst_b and (self.MLST[b] > mlst_b or self.MLST[b] < mlst_a): return False # Then check for any indices with a large amplitude in between the # starting and ending indices compRange = set([i for i in range(a, b)]) intersection = compRange & set(self.natSet) if len(intersection) > 0: for index in range(min(intersection), max(intersection)): amp = self.calcAmplitude(index, self.avgWin) mask = np.abs(amp) > ampThresh if True in mask: return True return False # Shift detection functions def checkForCompound(self, onsets, onAmpLst, skipRing=True, smoothDta=None, perc=None): # Get a list of unique onsets in the onsets list and corresp. amplitudes uniqLst = [onsets[0]] prevOnset = onsets[0] prevAmp = onAmpLst[0] ampLst = [prevAmp] for i in range(1, len(onsets)): currOnset = onsets[i] currAmp = onAmpLst[i] if abs(prevOnset-currOnset) > 15 or not self.vecWithinPerc(currAmp, prevAmp, self.validityPerc, 2): uniqLst.append(currOnset) ampLst.append(currAmp) prevAmp = currAmp prevOnset = currOnset uniqLst = [self.adjustIndex(index) for index in uniqLst] if len(uniqLst) < 3: # Compound shifts should have at least 3 changes return False, None, None # Make a list of indices to not end a potential compound shift on # because there is a nearby offset correcting it startPairs = [] for i in range(1, len(uniqLst)): a, b = uniqLst[i-1], uniqLst[i] if b - a < 20 and self.compareAmps(ampLst[i-1], ampLst[i], self.validityPerc): startPairs.append(a) potentialMatches = [] for k in range(3, len(uniqLst)+1): # Make sure this compound shift does not involve one of the tooth-like # artifacts that appear periodically throughout the data if self.checkForLargeGlitch(uniqLst[:k]): continue if self.checkStartEndForSpike(uniqLst[:k]): continue # Skip starting indices in startPairs and combinations that are # composed primarily of onsets in the ringing set if uniqLst[k-1] in startPairs: continue count = 0 for index in uniqLst[:k]: if index in self.ringSet: count += 1 if (len(uniqLst[:k]) - count) < 2: continue # Make sure ending amp is reasonable compared to averaged amplitude lft, rght = self.findRelativeSlopeChanges(uniqLst[k-1]) endAvgAmp = self.getNearbyAvg(rght, 3, 'right', ofst=0) - self.getNearbyAvg(lft, 3, 'left', ofst=0) center = int((lft+rght/2)) if not self.vecWithinPerc(ampLst[k-1], endAvgAmp, self.validityPerc, 2) or center in self.ringSet: continue # Get the absolute maximum value for each row of the amplitudes # involved in the current combination of onsets/offsets compVal = np.zeros(3) for row in range(0, 3): maxVal = 0 for e in ampLst[:k]: if np.abs(e[row]) > np.abs(maxVal): maxVal = np.abs(e[row]) compVal[row] = maxVal # Use this maximum value vector multiplied by some small percentage # to create a vector that serves as an upper bound for the # absolute value of the sum of the amplitudes when checking if # this compound shift correction is reasonable count = 0 minVec = np.zeros(3) mostGreat = False for row in range(0, 3): # Check if most amplitudes are large if compVal[row] > 1: count += 1 if count >= 2: mostGreat = True if perc is None: # Use custom percentage value if necessary minVec = compVal * 0.1 else: minVec = compVal * perc for row in range(0, 3): # If most amps are large and one component has a very small # max amp, then use a large percentage if not mostGreat or compVal[row] >= 1: minVec[row] = compVal[row] * perc else: minVec[row] = compVal[row] * 0.25 # Get the sum of the amplitudes and a list of rows that are lower # than the corresponding minVec value, as well as the count sumAmp = sum(ampLst[:k]) count = 0 matchingRows = [] nonMatchingRows = [] for row in range(0, 3): if np.abs(sumAmp[row]) < minVec[row] + self.tol: count += 1 matchingRows.append(row) else: nonMatchingRows.append(row) # Remcount is used when two components satisfy the first property # above, and additional criteria are used to see if the third # component is still reasonable remCount = 0 if count == 2: for i in range(0, 3): if i not in matchingRows: # Third component should be less than 2/3rds of its abs maximum val if np.abs(sumAmp[i]) < max((compVal*0.66)[i]+self.tol, 0.5): remCount += 1 # Additionally, if the natural variations in the field's value # between onsets are roughly equal to remaining row's sumAmp # then this is also reasonable. Also reasonable is having two # components with particularly small sumAmps if count == 2 and remCount == 0: subCount = 0 otherCount = 0 for i in range(0, 3): if i in matchingRows: if np.abs(sumAmp[i]) < (minVec[i] * (0.555)): subCount += 1 if np.abs(sumAmp[i]) < 0.1: otherCount += 1 if subCount == 2: remCount += 1 elif self.checkRowDiff(uniqLst[:k], sumAmp, nonMatchingRows[0]): remCount += 1 suffCount = ((count == 2 and remCount == 1) or (count == 3)) ringCompCorr = (self.ringingInRange(uniqLst[0], uniqLst[-1]) and (uniqLst[-1] - uniqLst[0]) < 400) if suffCount and (ringCompCorr or self.checkCompHeights(uniqLst[:k])): potentialMatches.append((k, sumAmp)) if len(potentialMatches) == 0: return False, None, None # Look for the potential compound shift with the smallest sumAmp magnitude min_k = potentialMatches[0][0] for k, sumAmp in potentialMatches: if self.vecNorm(sumAmp) < self.vecNorm(sum(ampLst[:min_k])): min_k = k sumAmp = sum(ampLst[:min_k]) resUniqLst = uniqLst[:min_k] return True, resUniqLst, sumAmp def checkForSimpleShift(self, i, j, amp_i, amp_j, perc, betweenLst): # betweenLst is the list of potential onsets detected between i and j if not self.checkIfOnset(np.abs(amp_j)): return False if not self.compareAmps(amp_i, amp_j, perc): # Similar values, opposite signs return False # Ending val is reasonable and there aren't large unresolved shifts in-between if not self.compareHeights([i]+betweenLst+[j], amp_i, amp_j*(-1)): return False if self.checkStartEndForSpike([i, j]): return False return True def checkForSpikeShift(self, a, b, amp_a, amp_b): slopePerc = 0.66 epsilon = 1 # nT # Check if indices are very close together and have similar amplitudes if b - a > 20: return False if not self.compareAmps(amp_a, amp_b, self.validityPerc): return False # Find where each onset begins/ends aLft, aRght = self.findSlopeChanges(a, self.data[self.refRow]) # if aRght == aLft: # aLft = a # aRght = a + 1 bLft, bRght = self.findSlopeChanges(b, self.data[self.refRow]) # if bLft == aRght: # bLft = b # bRght = b + 1 # Check if nearby slopes are much smaller than the slope of the spike leftSlope = (self.data[:,aRght] - self.data[:,aLft]) / (self.times[aRght] - self.times[aLft]) rightSlope = (self.data[:,bRght] - self.data[:,bLft]) / (self.times[bRght] - self.times[bLft]) outerLeftSlopeAvg = np.mean([self.getSlope(i, self.data[self.refRow]) for i in range(aLft-self.avgWin, aLft)]) outerRightSlopeAvg = np.mean([self.getSlope(i, self.data[self.refRow]) for i in range(bRght, bRght+self.avgWin)]) if abs(outerLeftSlopeAvg) > abs(leftSlope[self.refRow]*slopePerc) or abs(outerRightSlopeAvg) > abs(rightSlope[self.refRow]*slopePerc): return False # Lastly, check if field values to the left and right of the spike are similar leftAvg = self.getNearbyAvg(aLft, self.avgWin, 'left', ofst=0) rightAvg = self.getNearbyAvg(bRght, self.avgWin, 'right', ofst=0) if self.vecWithinEpsilon(rightAvg, leftAvg, epsilon): return True # Main shift identification / correction driving algorithm def correctShifts(self, winLen, t0, t1, avgWin, perc, refRow, compFrac): # Set state for current run self.refRow = refRow self.t0 = t0 self.t1 = t1 self.avgWin = avgWin self.compPerc = perc self.ringAmpCompPerc = perc + 0.05 self.compoundPerc = compFrac self.minAmp = min(self.t1, self.minAmp) self.initRingSet(t0, t1) # if natPtsFlag: # for index in self.natSet: # self.parent.axes[0].annotate(index, (self.times[index], self.data[0][index])) # self.parent.axes[0].plot(self.times[self.natSet], self.data[0][self.natSet], 'o') # if ringPtsFlag: # for index in self.ringSet: # self.parent.axes[0].annotate(index, (self.times[index], self.data[0][index])) # self.parent.axes[0].plot(self.times[self.ringSet], self.data[0][self.ringSet], 'o') # if mlstPtsFlag: # mask = self.parent.dqf[:,11] == 4 # indices = np.arange(0, self.dtaLen) # indices = indices[mask] # self.parent.plotPoints(indices[::25], 1, ptLabels=indices[::25]) maxIndex = self.dtaLen - 1 i = 1 # Check every index for onset property... i = onset, j = potential offset while i < maxIndex: terminates = False termIndex = None # Skip any indices in the MLST glitchy range or otherwise marked to skip if self.inMLSTSkipRange(i) or self.checkIfInSkipSet(i): i += 1 continue offsetLst = [] startVec = self.getDiffVec(i) smallCorrect = False # If an onset is found, calculate its amp and look for nearby onsets if self.checkIfOnset(np.abs(startVec), t0, t1): amp = self.calcAmplitude(i, avgWin) # Get list of potential offsets and their corresponding amplitudes offsetLst = list(self.findOnsetsInWindow(i+1, i + winLen, t0, t1)) if offsetLst == []: # No potential offsets found, skip this onset i += 1 continue offAmpLst = [self.calcAmplitude(z, avgWin) for z in offsetLst] # Get list of indices where two offsets form a spike-like or very # short peak-like shift and skip potential offsets along the start skipOnsets = [] otherLst, otherAmps = offsetLst, offAmpLst for z in range(1, len(otherLst)): a, b = otherLst[z-1], otherLst[z] aAmp = otherAmps[z-1] bAmp = otherAmps[z] if b-a < 15 and self.compareAmps(aAmp, bAmp, self.validityPerc): lft, rght = self.findSlopeChanges(a, self.data[self.refRow]) skipOnsets.extend([i for i in range(lft, rght)]) # Skip onsets near ringing where the amp is not similar to # the amplitude calculated from averages if i in self.filterKeyMap: lftKey, rghtKey = self.filterKeyMap[i] smoothDta = self.filterDtaDict[self.filterKeyMap[i]] lft, rght = self.findSlopeChanges(i-lftKey+self.ringEdgeWin, smoothDta[self.refRow], self.ringSlopeThresh) leftAvg = self.getNearbyAvg(lft+1, avgWin, 'left', dta=smoothDta, ofst=0) rightAvg = self.getNearbyAvg(rght-1, avgWin, 'right', dta=smoothDta, ofst=0) avgAmp = rightAvg - leftAvg if not self.vecWithinPerc(amp, avgAmp, self.ringAmpCompPerc, 2): i += 1 continue else: lft, rght = self.findSlopeChanges(i, self.data[self.refRow]) lftSlopes = [self.getSlope(i, self.data[self.refRow]) for i in range(lft-8, lft-2)] leftAvg = self.getNearbyAvg(lft, avgWin, 'left', ofst=0) rightVal = self.data[:,rght] avgAmp = rightVal - leftAvg if np.mean(np.abs(lftSlopes)) >= 0.3 / self.res: i += 1 continue # Look at each potential offset to see if it forms a simple # shift or a spike-like shift with the given onset i ofstIndex = 0 for j in offsetLst: # Skip offsets in MLST error range if self.inMLSTSkipRange(j): break if j in skipOnsets: ofstIndex += 1 continue termAmp = offAmpLst[ofstIndex] # Correct very close, 'peak'-style shifts by setting # nearby values to the interpolated average if self.checkForSpikeShift(i, j, amp, termAmp): self.correctSpike(i, j) smallCorrect = True self.markCorrection('Shift', i, j) i += 1 break # If it's not a spike and does not have a valid onset/offset, move on elif not self.checkIfOnset(np.abs(amp), t0, t1): break elif not self.checkIfOnset(np.abs(termAmp), t0, t1): ofstIndex += 1 continue # Skip terminating offsets whose averaged amplitude # doesn't match its regular amplitude # (for example, small spike-like artificats near large jumps) endAvgAmp = self._calcAvgAmp(j, avgWin) if j not in self.ringSet and not self.vecWithinPerc(endAvgAmp, termAmp, 0.45, 2): ofstIndex += 1 continue # Mark as terminated if an offset is found and there are no # significant unresolved shifts in-between if self.checkForSimpleShift(i, j, amp, termAmp, perc, offsetLst[:ofstIndex]): terminates = True termIndex = j break ofstIndex += 1 # Move on if offsetLst is empty if smallCorrect or offsetLst == []: i += 1 continue sumAmp = -1 ampDiff = -1 # Check if there is a compound shift if the conditions are appropriate if len(offsetLst) > 2: checkRes, uniqLst, sumAmp = self.checkForCompound([i] + offsetLst, [amp] + offAmpLst, perc=self.compPerc) # Adjust sumAmp so its relative to the number of onsets if checkRes: sumAmp = sumAmp/int(len(uniqLst)-2) else: checkRes = False # Compare whether a compound shift or a single simple shift # will be better to use based on the sum of the amplitudes if terminates and checkRes: ampDiff = abs(amp-termAmp*(-1)) sumNorm = self.vecNorm(sumAmp) ampNorm = self.vecNorm(ampDiff) if sumNorm <= ampNorm: terminates = False else: checkRes = False # Compound shift correction debugNum = 0 if checkRes: self.correctCompound(uniqLst) print ('Compound Correction', uniqLst[0], uniqLst[-1]) i = uniqLst[-1] - 1 # Simple shift correction elif terminates: self.correctSingleShift(i, termIndex, amp, termAmp*(-1), avgWin) print ('Simple Shift Correction', i, termIndex) else: i = offsetLst[0] - 1 i += 1 def ffDataDict(ffName): # Open flat file FID = FF_File.FF_ID(ffName, status=FF_File.FF_STATUS.READ | FF_File.FF_STATUS.EXIST) FID.open() # Extract data and column names nRows = FID.getRows() records = FID.DID.sliceArray(row=1, nRow=nRows) ffTime = records["time"] ffData = records["data"] colNames = FID.getColumnDescriptor("NAME")[1:] # Create dictionary ffDict = {} ffDict['Time'] = np.array(ffTime) for i, colName in enumerate(colNames): dta = np.array(ffData[:,i]) ffDict[colName] = dta # Close flat file and return result FID.close() return ffDict def extractData(fn): # Extracts data and assembles 3 x N array from vectors FID = FF_File.FF_ID(fn, status=FF_File.FF_STATUS.READ | FF_File.FF_STATUS.EXIST) FID.open() epoch = FID.getEpoch() ffColumns = FID.getColumnDescriptors() FID.close() ffDict = ffDataDict(fn) kws = ['Bx_IFG', 'By_IFG', 'Bz_IFG'] data = [ffDict[kw] for kw in kws] data = np.stack(data) times = ffDict['Time'] mlst = ffDict['MLST'] return times, data, mlst, ffDict, ffColumns, epoch def setupParser(): parser = argparse.ArgumentParser(description='Correct shifts in Insight data') parser.add_argument('filename', metavar='FFName', help='Flat file to be corrected') parser.add_argument('output', metavar='FFOutput', help='Name of output flat file') # Arg name, type, help argsList = [ ('--winLen', int, 'Number of points to look at to identify potential offsets/onsets'), ('--t0',float, 'First (larger) threshold used to identify a potential shift onset'), ('--t1', float, 'Second (smaller) threshold used to identify a potential shift onset'), ('--avgWin', int, 'Num of points to use when calculating averages\ + smoothing edges for corrections'), ('--compPerc', float, 'Perc that starting amp should be within end amp\'s \ values when comparing amplitudes'), ('--refRow', int, 'Vector row (Bx, By, Bz) = (0, 1, 2) that should be \ used for identifying changes in slope'), ('--compoundFrac', float, 'Percent of max amplitude that the sum of \ a set of amps needs to be under to count as a compound shift') ] argMetaVars = ['WL', 't0', 't1', 'avgWin', 'CP', 'RCP', 'refRow', 'compFrac'] for (name, argType, desc), mv in zip(argsList, argMetaVars): parser.add_argument(name, metavar='', type=argType, help=desc) args = parser.parse_args() return args def main(): # Set up argument parser args = setupParser() # Get filename and extract relevant info filename = args.filename outputfile = args.output times, dta, mlst, ffDict, ffColumns, epoch = extractData(filename) # Initialize dta corrector class and quality flag if available dqfColName = 'dqf2' dtaCorrector = InsightDataCorrector(times, dta, mlst=mlst) if dqfColName in ffDict: dtaCorrector.loadQualityFlagCol(ffDict[dqfColName]) # Parse arguments and pass them into dta corrector dtaCorrector.processData(args) # Rebuild units/sources/labels info units = [] sources = [] labels = [] for pos, colName, unit, src, misc1, misc2 in ffColumns[1:]: if colName == 'SCET' or colName == 'TIME': continue units.append(unit) sources.append(src) labels.append(colName) # Add dqf column if missing if dqfColName not in labels: labels.append(dqfColName) units.append('') sources.append('') # Arrange data to be passed to ffCreator ffDict[dqfColName] = dtaCorrector.getQualityFlagCol() for kw, row in zip(['Bx_IFG', 'By_IFG', 'Bz_IFG'], [0,1,2]): ffDict[kw] = dtaCorrector.data[row] ffData = [] for label in labels: if label == 'SCET': ffData.append(times) else: ffData.append(ffDict[label]) ffData = np.array(ffData).T createFF(outputfile, times, ffData, labels, units, sources, epoch) main() ============================================================================== Library codes - FF_DESC.py ============================================================================== # FF_DESC - Column Descriptors handler # Column descriptors are aligned by the first character # of each label in the header line # labels are " #", NAME, UNITS, SOURCE, TYPE, and LOC # This allows the column widths to be variable but # must not exceed 71 characters. A space is between " #" # and "NAME" # Tasks : from the header file # Use the header line to calculate the format string # Translate text from the header file to the file descriptor # components using the format string # Write file descriptor components to formatted text # Allow externally reset the component widths for NAME, UNITS # or SOURCE # There exists 3 ways to set the format # Default (3,10,10,26,4,4) (FORMAT,WIDTH) # From the header in the flatfile header # By widths for the three compoents: # {NAME, UNITS, SOURCE}=int # NOTE: WIDTH and FORMAT are mutable, so the default values # can change externally # ToDoes # document public attributes and methods """ flat file column descriptors each descriptor has column id, name, unit name, source, type, and location field. These fields are formatted in to one 72 character line. The field character width are fixed. field default width type comments col id 3 integer NAME 10 string UNITS 10 string SOURCE 26 string TYPE 4 string Bnn,Ann,I,L,T,R data type nn - multiples of 4 LOC 4 integer location in binary record """ class _FF_DESC: """descriptor class, manages the value attribute saves value/text pair formatted as in the header files calculate the values as used in the data formatted as it should be written into the header file """ def __get__(self, instance, value): # print("getValue", instance._name, instance._value) return instance._list def __set__(self, instance, value): if type(value) == str: instance._text = value instance._list = instance.T2V(instance) else: instance._list = value instance._text = instance.V2T(instance) def __str__(self): return self._text def text(self): return self._text def desc(self): return self._list class FFDESC: # column descriptor class """ usage : desc = FFDESC(ncol, header, {NAME,UNITS,SOURCE}==width ncol = number of columns header = header text (from flatfile) NAME,UNITS,SOURCE = reset character width in header header : 72 character string containing COL NAME UNITS SOURCE TYPE LOC spacing can be variable, but order is fixed """ def __init__(self, ncol, HEADER=None, **kwargs): """ ncol -> number column HEADER -> header line (determines format) NAME,UNITS,SOURCE=width reset widths FORMAT -> None default column widths MAX CHAR(C) FORTRAN col - 3 NAME - 10 NAMELEN 20 BASED ON COLUMN HEADER UNITS - 10 UNITSLEN 32 No character limit units may vary from 8 to 16 SOURCE- 26 SOURCELEN 30 TYPE - 6 TYPELEN 8 LOC - 4 total width needs to be no more 72 characters """ self._col = 0 self._ncol = ncol self._format = FFDESC._FORMAT self._header = FFDESC._HEADER self._widths = list(FFDESC._WIDTHS) self._D2L = FFDESC.desc2List self._L2D = FFDESC.list2Desc self._desc = [_FF_DESC() for i in range(ncol + 1)] for i in range(ncol + 1): self._desc[i] = _FF_DESC() self._getRecord = None if HEADER is not None: # value is the header from the flatfile value = HEADER # calculate how to read the data pos = self._pos = FFDESC._getColumnPosition(value) # need to calculate _widths list widths = [pos[i + 1] - pos[i] for i in range(len(pos[:-1]))] widths[0] = widths[0] - 1 # space between cols and NAME column self._widths = widths self._pos = pos self._setFormat() # **kwargs used for resetting widths if len(kwargs) > 0: self.setFormat(**kwargs) for i in range(ncol + 1): self._desc[i].id = i self._desc[i]._list = self._desc[i]._text = None if HEADER is not None: # Extract labels self._desc[0]._text = value self._desc[0]._list = self._D2L(self, value) else: # default values value = FFDESC._ColList self._col = 0 self._desc[0]._list = FFDESC._ColList self._desc[0]._text = self._L2D(self, value) self._col = 1 value = FFDESC._TimeList self._desc[1]._list = FFDESC._TimeList self._desc[1]._text = self._L2D(self, value) for i in range(ncol): icol = i + 1 self._col = icol nn = "%02d" % icol if i == 0: value = ["COL " + nn, "UNIT " + nn, "SOURCE", "T", i * 4] else: value = ["COL " + nn, "UNIT " + nn, "SOURCE", "R", i * 4 + 4] self._desc[icol]._list = value self._desc[icol]._text = self._L2D(self, value) def __str__(self): """ descriptor contents (column, text form, list form) """ l = [] for i in range(self._ncol): j = i + 1 l.append((j, self._desc[j]._text, self._desc[j]._list)) # l.append(self._desc[i]._text) return str(l) def getDescName(self, col): """ return column name """ dList = self.getList(col) return dList[0] def setDescName(self, col, value): """ set column name """ dList = self.getList(col) dList[0] = str(value) self.setDesc(col, dList) def setDesc(self, col, value): """ sets Column Descriptor for a col """ if type(value) is str: # Extract labels self._desc[col]._text = value self._desc[col]._list = self._D2L(self, value) else: # Write header text self._col = col # ldesc = self._desc[col]._list = list([col]) + value ldesc = self._desc[col]._list = value self._desc[col]._text = self._L2D(self, ldesc) def getDesc(self, column): """ return descriptor text for a column """ if column >= len(self._desc): print("getDesc warning: valid column index 1 to ", len(self._desc)) return None # print(col, len(self._desc[column].text())) return self._desc[column].text() def getList(self, col): """ return descriptor list for a col """ return self._desc[col].desc() def desc2List(self, value): """ reads in the text and converts to list""" pos = list(FFDESC._POS) limit = len(pos) - 2 plist = [value[pos[i]:pos[i + 1]].strip() for i in range(limit)] if self._col > 0: plist[0] = int(plist[0]) # column id plist[limit - 1] = int(plist[limit - 1]) # location return plist def list2Desc(self, value): """ formats data to text""" if self._col == 0: tup = tuple(value) string = self._header % tup else: tup = tuple([self._col] + value) string = self._format % tup return string # need setFormat based on header caption def setFormat(self, **kwargs): """ format generater for the flatfile header NAME=width, UNITS=width, SOURCE=width resets widths for NAME, UNITS, SOURCE columns """ keys = [i for i in FFDESC._ColPair] noGo = [] for key, value in kwargs.items(): if key in keys: index = FFDESC._ColPair[key] self._widths[index] = value else: noGo.append((key)) if noGo: print("FFDESC.setFormat Warning:") print(" valid keys are:", keys) print(" ", noGo, "are invalid keys") self._setFormat() for i in range(self._ncol): self._col = i self._desc[i]._text = self._L2D(self, self._desc[i]._list) def _setFormat(self): """ format generater for the flatfile header Uses column widths """ # list = ["%%3d%-", "s%%-", "s%%-", "s%%-4s4%%-d"] DFORM = "%%03d %%-%d.%ds%%-%d.%ds%%-%d.%ds%%-6s%%4d" HFORM = "%%03s %%-%d.%ds%%-%d.%ds%%-%d.%ds%%-6s%%4s" tup = () for i in self._widths[1:-2]: tup = tup + (i, i) self._header = HFORM % tup self._format = DFORM % tup def _setColumnPosition(self): """ calculate the column position by widths""" pos = [0] for i, j in enumerate(self._widths): value = pos[i] + j pos.append(value) self._pos = pos pass def _getColumnPosition(header): """Calculate the column position by caption.""" tags = [" #", "NAME", "UNITS", "SOURCE", "TYPE", "LOC"] pos = [header.find(tag) for tag in tags] pos.append(72) return pos def getFormat(self): """ column descriptor format """ return self._format def getHeader(self): """ column descriptor header """ return self._desc[0] def setHeader(self, inDesc): """ set column descriptor header no validation done """ if type(inDesc) == list: desc = self.list2Desc(inDesc) else: desc = inDesc self._desc[0] = desc return def Print(self): """ print contents""" for i in range(self._ncol): print(i, self._desc[i].desc()) # below are functions which generate put/get functions based on # the content of the types # not done yet def _genGetFunc(self): """ generates the binary get record """ # for now generate the format # form = "<"+ print(self) def _genPutFunc(self): """ generates the binary put record """ pass def _genTYPES(self): """ sets/check types based on type column""" """ sets/check location on type column""" pass # class attributes, these are mutable # ColList -> flatfile Column descriptors labels # TimeList -> default 1st column # FORMAT -> default column descriptor format # WIDTHS -> default column descriptor widths # ColType -> flatfile Column descriptors Types _ColPair = {"NAME": 1, "UNITS": 2, "SOURCE": 3} _ColList = [" #", "NAME", "UNITS", "SOURCE", "TYPE", "LOC"] _TimeList = ["TIME", "SEC", "SOURCE", "T", 0] _WIDTHS = [4, 10, 10, 26, 6, 4] # column widths _FORMAT = "%03d %-10.10s%-10.10s%-26.26s%-6s%4d" _HEADER = "%03s %-10.10s%-10.10s%-26.26s%-6s%4s" _POS = [0, 4, 14, 24, 50, 56, 60, 72] # corresponding positions FORMAT = list(_FORMAT) _ColType = [int, str, str, str, str, int] # desc -> ncol list of _FF_DESC() desc = _FF_DESC() if __name__ == '__main__': # change FF_SetInfo to just FF_Info, descriptor class header = " # NAME UNITS SOURCE TYPE LOC" DESC = FFDESC(2, header) # 2 columns, sets header ASET = ["TIME", "SEC", "SOURCE", "T", 0] DESC.setDesc(1, ASET) # set 1st column BSET = ["COL1", "UNIT", "SOURCE", "R", 8] DESC.setDesc(2, BSET) # set 2nd column print("OLD ", DESC) DESC.setFormat(NAME=5, UNITS=5) # reset widths print("NEW ", DESC) ============================================================================= FF_file.py ============================================================================= """ FlatFile usage: ff = FF_ID(fileName, status=STATUS) where STATUS can be one or orred FF_STATUS.{READ,WRITE,CREATE, EXIST,LOCK,APPEND,UPDATE} ff.open() # set activate flatfile object . . ff.close() # clean and close Note: requires numpy 1.14 or newer. """ # ToDos 2015 oct 11 add Resolution to FFInfo import os import struct import numpy from FF_Static import static, enum_base, splitTextBySize, firstSansSubstring, extractKEYS, selectionFromList from FF_INFO import FFINFO, FFPARM, FF_INFO from FF_DESC import FFDESC from FF_Time import FFTIME, lastLeap, FF_ERROR # TO DOS re-organize functions (private and public) # functions, separate class functions vs instance # functions # get vim plugin for todos annotations # put generics to another py file # flatfile info -> tuples (label, value, string, type) # make this a new class where setattr # input string, set value depending on the type # (float,double,string,Time) # Time - tuple (label, double, date) # make FFAttribute class (label, value, string, type) # a property class # with get/set attr functions replaced # get/setatt(self,string) -> saves string but converts value #FLATFILE CONSTANTS FF_ = static(ERROR_FLAG=1e34, FMUNIT=10) FF_STATUS = static(UNKNOWN=0o000, WRITE=0o001, READ=0o002, CREATE=0o004, EXIST=0o010, LOCK=0o020, APPEND=0o040, UPDATE=0o100) FF_EPOCH = static(Y1966="Y1966", Y2000="Y2000", J2000="J2000", Y1958="1958") class _FF_BASE: """ Common attributes for FF_HID & FF_DID """ TYPE = enum_base(tuple, ID=(0, "ID"), HID=(1, "HID"), DID=(2, "DID")) def __init__(self, type, name): self.type = type # object type self.name = name # file name self.loc = 0 # current record position (bytes) self.error = 0 # error message id self.file = 0 # file object self.size = 0 # file size (total bytes) self.nrows = 0 # file size (total record) self.lrecl = 0 # record length self.format = "type %s name %s loc %s error %d file %s size %d rows %d lrecl %d " self.mode = "" # self.resolution = None # hidden attribute # self.dtype = None npv = numpy.__version__ nps = '.'.join(npv.split('.',2)[:2]) npn = float(nps) self.usingNewerNumpy = npn >= 1.14 #need extractors and retrievers here def __str__(self): """ attributes """ return self.format % (str(self.type), self.name, self.loc, self.error, self.file, self.size, self.nrows, self.lrecl) def readable(mode): """ read accessiblity """ if mode is None: return False READABLE = set('ra+') return bool(set(mode) & READABLE) def writable(mode): """ write accessiblity """ if mode is None: return False WRITABLE = set('wa+') return bool(set(mode) & WRITABLE) def getRow(self): """ return current row """ if self.file == 0: return None return (self.file.tell() / self.lrecl) + 1 def setRow(self, row): """ set current row """ if (row < 1) and (self.file == 0): return # self.file.seek(0, (row - 1) * self.lrecl) self.file.seek(int((row - 1) * self.lrecl)) # self.loc = self.file.ftell() self.loc = self.file.tell() #these may be superceded by HID and DID classes def cput(self, record, row=None): # text write """put - write a record, reset current row return -1 if failed or bytes written """ if not _FF_BASE.writable(self.mode): return -1 if row is not None: if row < 1 or row > self.nrows: return -1 self.loc = self.lrecl * (row - 1) self.file.seek(self.loc, 0) buffer = record.ljust(self.lrecl) self.file.write(buffer) self.loc = self.loc + self.lrecl return int(self.lrecl) def cget(self, row=None): # text read """get - read a record, reset current row""" # default binary format Time (double, 8 bytes ) # (ncol - 1 ) * float, 4 bytes if not _FF_BASE.readable(self.mode): return -1 if row is not None: if row < 1 or row > self.nrows: return None self.loc = self.lrecl * (row - 1) self.file.seek(self.loc, 0) record = self.file.read(self.lrecl) self.loc = self.loc + self.lrecl return record def put(self, brec, row=None): # binary write """get - write a binary record, reset current row""" # default binary format Time (double, 8 bytes ) # (ncol - 1 ) * float, 4 bytes # does not check for valid lrecl, brec if not _FF_BASE.writable(self.mode): return -1 if row is not None: if row < 1 or row > self.nrows: return None self.loc = self.lrecl * (row - 1) self.file.seek(self.loc, 0) rest = (self.lrecl - 8) // 4 dform = '>d' + 'f' * rest data = list(brec) record = struct.pack(dform, *data) self.loc = self.loc + self.lrecl self.file.write(record) return len(data) def get(self, row=None): # binary read """get - read a binary record, reset current row""" # default binary format Time (double, 8 bytes ) # (ncol - 1 ) * float, 4 bytes if not _FF_BASE.readable(self.mode): return None if row is not None: if row < 1 or row > self.nrows: return None self.loc = self.lrecl * (row - 1) self.file.seek(self.loc, 0) rest = (self.lrecl - 8) // 4 dform = '>d' + 'f' * rest record = struct.unpack(dform, self.file.read(self.lrecl)) self.loc = self.loc + self.lrecl return record def sliceArray(self, row=None, nRow=1, UseColumnName=False): # binary read """ reads data from row of nRow returns numpy data type [array time(nRow), array data(nRow,columns)] """ if not _FF_BASE.readable(self.mode): return None if self.nrows <= 0: print(" FF_File.sliceArray: file has no data") return None if row is None or nRow is None: print(" FF_File.sliceArray: row or nRow is None type") return None if row < 1 or nRow < 1: print(" ff_file.sliceArray row or nRow < 1", row, nRow, self.nrows) return None if row + nRow > self.nrows: nRow = self.nrows - row + 1 if nRow < 1 or nRow > self.nrows: return None self.loc = self.lrecl * (row - 1) self.file.seek(self.loc, 0) # nRows = nRow - row + 1 2015 Jun 29 nRows = nRow l = self.lrecl rest = (l - 8) // 4 if UseColumnName: # with column names name = self.FF_ID.getColumnDescriptor("NAME") dt = [(n, '>f') for n in name] dt[0] = ('time', 'd') else: # no names dt = [("time", '>d'), ('data', '>f4', (rest))] name = self.name # byte string conversion here is a numpy 1.14 compatibility issue #https://github.com/numpy/numpy/blob/master/doc/release/1.14.0-notes.rst #Assignment from a structured array to a boolean array now raises a ValueError, unlike in 1.13, where it always set the destination elements to True. #Assignment from structured array with more than one field to a non-structured array now raises a ValueError. In 1.13 this copied just the first field of the source to the destination. if self.usingNewerNumpy: # np >= 1.14, seems like this was added a couple versions ago but keeping just in case return numpy.memmap(name, dtype=dt, mode='r', offset=self.loc, shape=None) else: print(" FF_File.sliceArray: requires numpy 1.14 or newer") exit(1) # The code below was found not to work, so we now require numpy 1.14 or newer. # nBytes = l * nRows # bytes = self.file.read(nBytes) # x = numpy.zeros(nRows, dtype=dt) # for i in range(nRows): # jO = i * l # jE = (i + 1) * l - 1 # x[i] = bytes[jO:jE] # return x def arrayToRecord(array, nCols): """ converts array of columns to array of records :param records: array of flatfile records :return: returns array of columns """ records = tuple(zip(*array[:nCols])) return records def arrayToColumns(array): """ converts array(m,n) array(n,m) :param rarray: numpy.array (M, N) :return: returns numpy.array (N, M) """ size = array.size shape = array.shape[::-1] cols = numpy.reshape(array, size, order='F').reshape(shape) return cols def timeIndex(time, t, dt=0.0005): """ :param time: time vector :param t: value search for index :param dt: margin for error :return: index in value closest to t """ if t <= time[0]: return 0 if t >= time[-1]: return len(time) - 1 closest = numpy.argmin(abs(time - t)) return closest class FF_DID (_FF_BASE): """ Flatfile Data """ def __init__(self, name): _FF_BASE.__init__(self, _FF_BASE.TYPE.DID, name + ".ffd") class FF_HID (_FF_BASE): """ FlatFile Header """ _FORMAT = "%-72.72s" _DATA_TYPE = static(TIME=(0o000, "T", 8), DOUBLE=(0o001, "D", 8), FLOAT=(0o002, "R", 4), LONG=(0o003, "I", 4), ULONG=(0o004, "U"), SHORT=(0o005, "S", 2), USHORT=(0o006, "US", 2), BYTE=(0o007, "B", 1), CHAR=(0o010, "A", 1)) def __init__(self, name): self.hasEpoch = 0 # if file contains "EPOCH = YYYYY" record self.fmunit = FF_.FMUNIT # unit column char size _FF_BASE.__init__(self, _FF_BASE.TYPE.HID, name + ".ffh") self.lrecl = 72 self.abstractPos = 0 # record where the Abstract begins def __str__(self): return _FF_BASE.__str__(self) + "ABSTRACT " + str(self.abstractPos) def display(self, start=1, nrows=0): """ displays header contents """ if start < 1 or start > self.nrows: return pos0 = 0 if start == 1 else (start - 1) * self.lrecl size = self.size - pos0 if nrows == 0 else nrows * self.lrecl self.file.seek(pos0, 0) text = self.file.read(size) list = splitTextBySize(text, self.lrecl) for item in list: print(item) self.file.seek(self.loc, 0) def _getColumnPosition(header): """Calculate the column position by caption.""" tags = [" #", "NAME", "UNITS", "SOURCE", "TYPE", "LOC"] pos = [header.find(tag) for tag in tags] pos.append(72) return pos def setUnitFormat(self, unitFormat=FF_.FMUNIT): """ set unit label length """ self.fmunit = unitFormat def getUnitFormat(self): """ return unit label length""" return self.fmunit # use col = BYTE(-1),size FF_ETYPE = enum_base(tuple, TIME=(0o000, "T", 8), DOUBLE=(0o001, "D", 8), FLOAT=(0o002, "R", 4), LONG=(0o003, "I", 4), ULONG=(0o004, "U"), SHORT=(0o005, "S", 2), USHORT=(0o006, "US", 2), BYTE=(0o007, "B", 1), CHAR=(0o010, "A", 1)) class FF_ID: """ Flatfile """ # remove the FF_ prefix, redundant # FF_INFO are attribute,labels pairs, attributes parallel to FF_ID # attributes # FF_EPOCH = {"Y1966": 19660, "Y2000": 20000, "J2000": 20005, "Y1958": 195800} FF_OPSYS = {"UNKNOPS": 0, "UNIXOPS": 1, "SUNOPS": 2, "Microsoft Windows": 3, "MACOPS": 4} # FF_DATAT = {"TIME66": 0o000, "DOUBLE": 0o001, "FLOAT": 0o002, "LONG": 0o003, # "ULONG": 0o004, "SHORT": 0o005, "USHORT": 0o006, "BYTE": 0o007, # "CHAR": 0o010} # FF_PARMS -> corresponding labels in the FF_ID instance FF_PARMS = ("STATUS", "LRECL", "NCOLS", "NROWS", "OPSYS", "EPOCH") FF_EPOCH = static(Y1966="Y1966", Y2000="Y2000", J2000="J2000", Y1958="1958") # FF_INFO labels as shown in the header file # FF_INFO = static(FIRST_TIME="FIRST TIME", LAST_TIME="LAST TIME", # RESOLUTION="AVERAGE INTERVAL", ERROR_FLAG="MISSING DATA FLAG", # OWNER="OWNER", ORBITS="ORBIT NUMBER(S)") def __init__(self, name, status=FF_STATUS.EXIST, recl=0, ncols=0, nrows=0, opsys="UNIXOPS", epoch=FF_EPOCH.Y2000, version=None): """ keywords: status,recl,ncols, nrows, opsys, epoch, version """ self.silent = True self.type = _FF_BASE.TYPE.ID #check for name self.name = name self.version = version # FlatFile parameters : required self.status = status # status word FFParmAttr = FFPARM._AttList FFParmList = [FFPARM("DATA", name + ".ffd", TYPE=str), FFPARM("CDATE", 0), FFPARM("RECL", recl, TYPE=int, FORMAT="%6d"), FFPARM("NCOLS", ncols, TYPE=int, FORMAT="%6d"), FFPARM("NROWS", nrows, TYPE=int, FORMAT="%11d"), FFPARM("OPSYS", opsys, TYPE=str), FFPARM("EPOCH", epoch, TYPE=str)] self.FFParm = dict(zip(FFParmAttr, FFParmList)) # FlatFile information: derived from Abstract section FFInfoAttr = FFINFO._AttList FFINFOList = [FFINFO("FIRST_TIME", 0, SAVE=True), FFINFO("LAST_TIME", 0, SAVE=True), FFINFO("RESOLUTION", "HH:MM:SS.sss"), FFINFO("ERROR_FLAG", FF_.ERROR_FLAG, TYPE=float, FORMAT="%13.6e", SAVE=True), FFINFO("OWNER", "UCLA/IGPP", SAVE=True), FFINFO("ORBITS", "001 - 001", FORMAT="%3d - %3d", SAVE=False), FFINFO("LAST_LEAP", "UNKNOWN", SAVE=False), FFINFO("APOGEE", "0", SAVE=False)] self.FFInfo = dict(zip(FFInfoAttr, FFINFOList)) # STRUCTURE self.HID = FF_HID(name) self.DID = FF_DID(name) self.DID.lrecl = recl self.HID.FF_ID = self # allow access to common variables self.DID.FF_ID = self # allow access to common variables self.mode = None # determines how to open the flat file pairs self.format = "TYPE %s " self.resolution = None # internal attributes self.dtype = None self.abstractPos = 0 def __str__(self): """ output flatfile pararmeters, column descriptors, information header and data """ vars = self.format % (str(self.type)) parms = "\n PARMS " + str(self.getParameters()) desc = "\n DESCS " + str(self.getColumnDescriptors()) info = "\n INFO " + str(self.getInfo()) HID = "\n HEADER " + self.HID.__str__() DID = "\n DATA " + self.DID.__str__() output = vars + parms + desc + info + HID + DID return output def _extractParameters(self, header): """Extract FF_PARM attributes from header(string)""" endParm = header.index(' # NAME') parmData = extractKEYS(header[0:endParm], self.HID.lrecl) for key in parmData: self.FFParm[key].info = parmData[key] self.DID.lrecl = self.FFParm["RECL"].value # if Epoch key not found then the file in in cline time if "EPOCH" not in parmData: self.FFParm["EPOCH"].info = "Y1966" def _extractColumnDescriptors(self, header): """ extract column Descriptors from header content""" lrecl = self.HID.lrecl begCols = header.index(' # NAME') # Note : this work only if ABSRACT IS A THE 1st char # recalibrate (toDos) endCols = self._AbstractPos(header) KEYS = splitTextBySize(header[begCols:endCols], lrecl) # pos = [0] + FF_HID._getColumnPosition(KEYS[0]) ncols = len(KEYS) colDesc = FFDESC(ncols, HEADER=KEYS[0]) for i in range(ncols): colDesc.setDesc(i, KEYS[i]) self.colDesc = colDesc self.descriptPos = begCols // lrecl self.abstractPos = endCols // lrecl def _AbstractPos(self, header): """file ptr where ABSTRACT tag is written """ lrecl = self.HID.lrecl pos = int(header.index('ABSTRACT') / lrecl) * lrecl return pos def _extractInfo(self, header): """Extract FF_INFO attributes from header(string)""" # Not all FF_INFO values may occur in the file lrecl = self.HID.lrecl index0 = self._AbstractPos(header) + lrecl seq = header[index0:] strings = splitTextBySize(seq, lrecl) index = firstSansSubstring(strings, "=") infoList = [list.strip().split(" = ") for list in strings[:index]] infoDict = dict([K[0].strip(), K[1].strip()] for K in infoList) self.HID.AbstractPos = int((index0 + index * lrecl) / lrecl + 1) EPOCH = self.FFParm["EPOCH"].value # if EPOCH != "Y2000": # default time self.FFInfo["FIRST_TIME"].setEpoch(EPOCH) self.FFInfo["LAST_TIME"].setEpoch(EPOCH) for key in infoDict: # find matching attribute attribute = FF_INFO.__findAtt__(key) if attribute is not None: self.FFInfo[attribute].info = infoDict[key] if self.FFInfo['ERROR_FLAG']: self.error = self.FFInfo['ERROR_FLAG'].value self.DID.error = self.FFInfo['ERROR_FLAG'].value def _saveParameters(self): FFParmAttr = FFPARM._AttList self.HID.file.seek(0, 0) for key in FFParmAttr: buffer = "{:<5} = {}".format(key, self.FFParm[key].info) buffer = FF_HID._FORMAT % (buffer) self.HID.file.write(buffer) self.HID.PDESC = self.HID.file.tell() def _saveColumnDescriptors(self): ncol = self.FFParm["NCOLS"].value if hasattr(self, "colDesc"): for i in range(ncol + 1): # include header buffer = self.colDesc.getDesc(i) buffer = FF_HID._FORMAT % (buffer) self.HID.file.write(buffer) self.HID.PABSTITLE = self.HID.file.tell() else: print("FF_ID warning: Column descriptors are not defined") def _saveInfo(self): FFINFOList = FFINFO._AttList for key in FFINFOList: if self.FFInfo[key]._save: label = "%-18s" % getattr(FF_INFO, key) buffer = label + " = " + self.FFInfo[key].info buffer = FF_HID._FORMAT % (buffer) self.HID.file.write(buffer) self.HID.PABSTEXT = self.HID.file.tell() def _encodeStatus(status): KEYS = ["WRITE", "READ", "CREATE", "EXIST", "LOCK", "APPEND", "UPDATE"] string = "" for i in range(0, len(KEYS)): if status & 2 ** i: if string != "": string = string + " | " string = string + KEYS[i] return string def _decodeStatusString(statusString): selection = list(statusString.split()) KEYS = ["WRITE", "READ", "CREATE", "EXIST", "LOCK", "APPEND", "UPDATE"] status = 0 items = selectionFromList(KEYS, selection) for item in items: status = status | pow(2, item) return status def getFileMode(status): """ calculate which mode for the open method usage _decodeStatus(status=int) or'ed bitflags, refer to FF_STATUS, print(FF_STATUS) usage _decodeStatus(status=string) string can be any combination of KEYS = ["WRITE", "READ", "CREATE", "EXIST", "LOCK"" "APPEND", "UPDATE"] """ # need to filter out the lock bit # Note : if APPEND the mode for the header needs to be "w" # MODE | C | PYTHON # r | read & exists | read only # w | create/overwrite | write only, erase # a | keep old, add | append only # r+ | exist, read/write | read, write # w+ | read/write,overwrite| read/write,overwrite # a+ | keep old, append | read/write, keep old #TAGS -> ORIGANAL C mode, keys, python mode for header and data # Allow ability to read/update the header file if exists CREATEWRITE = ("w", FF_STATUS.CREATE | FF_STATUS.WRITE, "w", "wb") EXISTCREATEWRITE = ("w", FF_STATUS.EXIST | FF_STATUS.CREATE | FF_STATUS.WRITE, "w+", "wb") OVERWRITE = ("w+", status, "w+", "wb+") READONLY = ("r", FF_STATUS.EXIST | FF_STATUS.READ, "r", "rb") EXISTREADWRITE = ("r+", FF_STATUS.EXIST | FF_STATUS.READ | FF_STATUS.WRITE, "r+", "rb+") APPEND = ("a+", FF_STATUS.APPEND, "r+", "ab+") UPDATE = ("r+", FF_STATUS.UPDATE, "r+", "rb+") # table based from ff_igpp.c # No indices for 6 and 15 --> CREATE and READ are mutually exclusive hash = {0: CREATEWRITE, FF_STATUS.CREATE | FF_STATUS.WRITE: CREATEWRITE, FF_STATUS.WRITE: CREATEWRITE, FF_STATUS.CREATE: CREATEWRITE, FF_STATUS.CREATE | FF_STATUS.READ | FF_STATUS.WRITE: OVERWRITE, FF_STATUS.EXIST | FF_STATUS.READ: READONLY, FF_STATUS.READ: READONLY, FF_STATUS.EXIST: READONLY, FF_STATUS.EXIST | FF_STATUS.READ | FF_STATUS.WRITE: EXISTREADWRITE, FF_STATUS.EXIST | FF_STATUS.WRITE: EXISTREADWRITE, FF_STATUS.READ | FF_STATUS.WRITE: EXISTREADWRITE, FF_STATUS.EXIST | FF_STATUS.CREATE | FF_STATUS.WRITE: EXISTCREATEWRITE, FF_STATUS.EXIST | FF_STATUS.CREATE: EXISTCREATEWRITE, FF_STATUS.EXIST | FF_STATUS.CREATE | FF_STATUS.READ: ("w", status, "w", "wb") } status = status & ~FF_STATUS.LOCK # filter the LOCK bit mode = None if status == FF_STATUS.APPEND: mode = APPEND if status == FF_STATUS.UPDATE: mode = UPDATE if mode is None: mode = hash[status] return mode def checkFileStatus(self): """check whether status used is valid""" path = os.path.split(self.HID.name)[0] ErrMess = [] ErrCont = 0 if path == '': path = os.path.realpath(path) readable = _FF_BASE.readable(self.mode[0]) writable = _FF_BASE.writable(self.mode[0]) #check path ErrMess = [] table = ((0, os.access(path, os.F_OK) is False, "Path does not exist"), (1, (not os.access(path, os.R_OK)) & readable, "Path has no read access"), (2, (not os.access(path, os.W_OK)) & writable, "Path has no write access")) for (key, test, mess) in table: if test is True: ErrMess.append(mess) ErrCont = ErrCont + key ** 2 for (key, name, label) in (1, self.HID.name, "Header"), (2, self.HID.name, "Data"): FOK = os.access(name, os.F_OK) ROK = os.access(name, os.R_OK) WOK = os.access(name, os.W_OK) self.valid = True if ((not FOK) & (not writable)): self.valid = False ErrMess.append(label + " File does not exist") ErrCont = ErrCont + (key * 3) ** 1 if (not ROK) & readable & FOK is True: self.valid = False ErrMess.append(label + " File has no read access") ErrCont = ErrCont + ((key * 3) + 1) ** 2 if (not WOK) & FOK: self.valid = False ErrMess.append(label + " File has no write access") ErrCont = ErrCont + ((key * 3) + 2) ** 2 return [ErrCont, ErrMess] def getParm(self): """ pretty Print Pareamters""" return(2) def open(self): """ opens flatfile """ # Tasks : # set mode argument for the open method # verify access status for path, header and data files # lock the files if flagged # Extract flatfile attributes from existing file # ToDo : set as private, open only once per instance, # rename this as _open_ if not self.silent: print("FFFILE.open:Opening " + self.name) statusInt = self.status if isinstance(self.status, str): statusInt = FF_ID._decodeStatusString(self.status) self.mode = FF_ID.getFileMode(statusInt) self.HID.mode = self.mode[2] self.DID.mode = self.mode[3] fileStatus = self.checkFileStatus() readData = False if os.path.isfile(self.HID.name): readData = True if fileStatus[0] != 0: print("CHECKING ", self.name) status = FF_ID._encodeStatus(self.status) print("FF_File Error : directory or file status contradict to settings.") print("FF_FILE Status Settings : ", status) for error in fileStatus[1]: print(" " + error) return -fileStatus[0] try: #import locale #print(locale.getpreferredencoding(False)) # jfc3 6-21-2018, sometimes default encodings are 'gbk' and 'utf-8', problem arose from files saved with bx self.HID.file = open(self.HID.name, mode=self.mode[2], encoding='cp1252') if FF_STATUS.LOCK & statusInt: print("FF_File Warning : LOCK feature depreciated") except: status = FF_ID._encodeStatus(self.status) print("FF_File Error: Unable to open '" + self.name + "'") print(" : settings = ", self.status, status) return -1 #READ IF EXISTS, store positions where the parm, column desscriptors, # and information are stored # jfc3 6-21-2018, adding try except in case fails somehow so program doesn't crash try: if readData is True: # write readParm,readCols,readInfo self.HID.size = len = os.path.getsize(self.HID.name) if (len != 0): self.HID.nrows = len // self.HID.lrecl # read flatfile parameters and set attributes header = self.HID.file.read(len) self._extractParameters(header) self._extractColumnDescriptors(header) self._extractInfo(header) self.HID.file.seek(0, 0) except Exception as e: print(e) return -1 try: self.DID.file = open(self.DID.name, mode=self.mode[3]) self.DID.nrows = self.FFParm["NROWS"].value self.nrows = self.DID.nrows except: status = FF_ID._encodeStatus(self.status) print("FF_File Error: Unable to open '" + self.name + "'") print(" : settings = ", self.status, status) return -1 if not hasattr(self, "colDesc"): self.colDesc = FFDESC(self.FFParm["NCOLS"].value) return 1 def close(self): """ closes flatfile """ # here save the file attributes # if mode is read only: do nothing # if mode is update : update PARM, INFO, ABSTRACT # if mode is create or overwite : PARM, DESC, INFO, ABSTRACT if self.mode is None: return if not self.silent: print("CLOSING", self.name) # if self.mode[2][0] != 'r': # print("UPDATE HEADER", self.name) if (len(self.mode[2]) == 2) or (self.mode[0][0] == 'a'): # data is updated/modified # parm - nrows, UPDATE # info - first time, last time, orbit numbers # info and column descriptors are not affected print(" Updating", self.name) UDATE = self.FFParm["CDATE"] UDATE.value = -1 UDATE._format = "%s" + 6 * " " + "UPDATE= %s" UDATE._type = list self._saveParameters() self.HID.file.seek((self.abstractPos + 1) * self.HID.lrecl, 0) self._saveInfo() self._saveAbstract() if self.mode[0][0] == 'w': # OVERWRITE DATA self._saveParameters() self._saveColumnDescriptors() abstract = "ABSTRACT" if self.version is not None: abstract = abstract + " " + self.version self.HID.file.write(FF_HID._FORMAT % abstract) self._saveInfo() self._saveAbstract() self.HID.file.write(FF_HID._FORMAT % "END") if self.HID: if self.HID.file: self.HID.file.close() self.HID = None if self.DID: if self.DID.file: self.DID.file.close() self.DID = None def setSilent(self): self.silent = True def setVerbose(self): self.silent = False def writeParms(self): """ write parameter pairs to header""" pass def getParameters(self): """ return tuple (name, value) status,data record length, number columns,number data rows, epoch """ FFParmAttr = FFPARM._AttList parm = [(self.FFParm[key]._name, self.FFParm[key].info, self.FFParm[key].value) for key in FFParmAttr if self.FFParm[key]._save] return parm def getColumnDescriptors(self): """ return list(col, name, unit, source, type, loc) """ if not hasattr(self, "colDesc"): return None out = [] ncols = self.FFParm["NCOLS"].value + 1 for i in range(ncols): desc = self.colDesc.getList(i) out.append(tuple(desc)) return out def getColumnDescriptor(self, key="NAME"): """ return list( one of col, name, unit, source, type, loc) """ tags = ["NAME", "UNITS", "SOURCE", "TYPE", "LOC"] pos = tags.index(key) + 1 out = [] ncols = self.FFParm["NCOLS"].value for i in range(ncols): out.append(self.colDesc.getList(i + 1)[pos]) return(out) def getInfo(self): """ return tuple (name, string value, value) first_time,last_time,resolution,flag; string owner, orbits Output items that are set. """ # leel 16/02/9 orbits and apsis are optional, need to check FFInfoAttr = FFINFO._AttList info = [(self.FFInfo[key]._name, self.FFInfo[key].info, self.FFInfo[key].value) for key in FFInfoAttr if self.FFInfo[key]._save] return info def setParameters(self, **kwargs): """set instance parameters """ keys = FFPARM._AttList noGo = [] for key, value in kwargs.items(): if key in keys: self.FFParm[key].info = value else: noGo.append((key)) if noGo: print("FF.setParameters Warning:") print(" valid keys are:", keys) print(" ", noGo, "are invalid keys") def setInfo(self, **kwargs): keys = FFINFO._AttList noGo = [] epoch = self.FFParm["EPOCH"] # self.FFInfo["LAST_LEAP"].setLastLeap() for key, value in kwargs.items(): if key in keys: self.FFInfo[key].info = value if key == "FIRST_TIME": self.FFInfo["FIRST_TIME"].setEpoch(epoch.value) if key == "LAST_TIME": self.FFInfo["LAST_TIME"].setEpoch(epoch.value) if key == "LAST_LEAP": continue else: # if key is "EPOCH": # self.FFInfo["FIRST_TIME"].setEpoch(epoch.value) # self.FFInfo["LAST_TIME"].setEpoch(epoch.value) # print(key) # else: noGo.append((key)) if noGo: print("FF.setInfo Warning:") print(" valid keys are:", keys) print(" ", noGo, "are invalid keys") def getAbstract(self): """ flatfile abstract """ abstract = [] self.HID.file.seek(0, 0) text = self.HID.file.read(self.HID.size) pos = self._AbstractPos(text) list = splitTextBySize(text[pos:], self.HID.lrecl) index = firstSansSubstring(list[1:], "=") + 1 abstract = [item.strip() for item in list[index:-1]] self.HID.file.seek(self.HID.loc, 0) return abstract def setAbstract(self, abstract): # abstract , user defined abstract here # written when file is closed self.abstractAdd = abstract def updateAbstract(self, list): """replaces old Abstract""" if not _FF_BASE.writable(self.HID.mode): return -1 self.HID.file.seek(0, 0) text = self.HID.file.read(self.HID.size) lrecl = self.HID.lrecl pos = self._AbstractPos(text) text = splitTextBySize(text[pos:], lrecl) index = firstSansSubstring(text[1:], "=") + 1 pos = pos + index * lrecl self.HID.file.seek(pos, 0) for item in list: buffer = item.ljust(lrecl) self.HID.file.write(buffer) buffer = "END".ljust(lrecl) self.HID.file.write(buffer) self.HID.file.truncate() self.HID.loc = self.HID.size = self.HID.file.tell() return len(list) def appendAbstract(self, list): """append to Abstract section""" if not _FF_BASE.writable(self.HID.mode): return -1 lrecl = self.HID.lrecl pos = self.HID.size - lrecl self.HID.file.seek(pos, 0) for item in list: buffer = item.ljust(lrecl) self.HID.file.write(buffer) self.HID.loc = self.HID.size = self.HID.file.tell() return len(list) def _saveAbstract(self): """append to Abstract section""" # abstract items should not be more than 72 characters if not hasattr(self, "abstractAdd"): return if not _FF_BASE.writable(self.HID.mode): return -1 lrecl = self.HID.lrecl for item in self.abstractAdd: buffer = item.ljust(lrecl) if (len(buffer) > 72): buff1 = buffer[0:72] self.HID.file.write(buff1) buff2 = buffer[72:].ljust(lrecl) self.HID.file.write(buff2) else: self.HID.file.write(buffer) self.HID.loc = self.HID.size = self.HID.file.tell() return len(self.abstractAdd) def dataBuffer(self, startRow=1, stopRow=-1, time="offset"): """ return data array[startRow:stopRow][1:ncols]""" # keys = ["class", "offset", "date"] # cols = self.FFParm["NCOLS"].value if stopRow == 1: stopRow = self.FFParm["NROWS"].value # nRow = stopRow - startRow + 1 # da = [nRow][cols] pass def getTime(self, row=None): # binary read """getTime - get time, no resetting current row""" # default binary format Time (double, 8 bytes ) # (ncol - 1 ) * float, 4 bytes if not _FF_BASE.readable(self.mode): print("NOT READABLE") return None DID = self.DID if row is not None: if row < 1 or row > DID.nrows: print("getTime: out of bounds Rows:", row, "TOTAL ", DID.nrows) return None loc = DID.lrecl * (row - 1) DID.file.seek(loc, 0) rest = (DID.lrecl - 8) // 4 dform = '>d' + 'f' * rest record = struct.unpack(dform, DID.file.read(DID.lrecl)) DID.file.seek(DID.loc, 0) # return to last position return record[0] def getTimeSpan(self): # start stop time of file first = self.FFInfo["FIRST_TIME"] last = self.FFInfo["LAST_TIME"] return first, last def ffsearch(self, timeTick, startRow, stop_Row, state=0, Verbose=False): """find record by timeTick, state:-1=floor,0=closest,1=ceil""" nrows = self.DID.nrows stRow = max(1, startRow) spRow = min(nrows, stop_Row) tO = self.getTime(row=stRow) tE = self.getTime(row=spRow) rO = startRow rE = stop_Row same = tO == timeTick or tE == timeTick tolerance = max(FF_ERROR, self.resolution) margin = (tO - timeTick) > tolerance or (tE - timeTick) < tolerance if same: if tO == timeTick: return 1 if tE == timeTick: return self.DID.nrows if margin: if Verbose: FFTO = FFTIME(tO, Epoch=self.getEpoch()) FFTE = FFTIME(tE, Epoch=self.getEpoch()) TICK = FFTIME(timeTick, Epoch=self.getEpoch()) print("ffsearch", timeTick, "is out of bounds", self.getEpoch()) print(" tO", tO, "tE", tE, "TimeTick", timeTick) print(" tO", FFTO.UTC, "tE", FFTE.UTC, "TimeTick", TICK.UTC) print(" tO - timeTick", tO - timeTick, "tE - timeTick", tE - timeTick) print(" stRow ", stRow, "spRow", spRow) if timeTick < tO: diff = abs(timeTick - tO) print(" timeTick before first record", tO, diff) if abs(diff) < tolerance: print(" set to first record", 1) return 1 if timeTick > tE: diff = abs(timeTick - tE) print(" timeTick after last record", tE, diff) if diff < tolerance: print(" set to last record", nrows) return nrows epoch = self.FFParm["EPOCH"].value time_UTC = FFTIME(timeTick, Epoch=epoch).UTC startUTC = FFTIME(tO, Epoch=epoch).UTC stop_UTC = FFTIME(tE, Epoch=epoch).UTC[9:] if Verbose: print(" TIME", time_UTC) print(" Search BOUNDS", startUTC, "to", stop_UTC) print("FFSEARCH RETURN NONE") return None found = False while not found: dtdr = (tE - tO) / (rE - rO) r = int((timeTick - tO) / dtdr) + rO time = self.getTime(row=r) if time is None: print("ffsearch no time") return None tdt = abs(time - timeTick) found = tdt < dtdr or tdt < tolerance if found: rmn = max(r - 1, 1) rmx = min(r + 1, self.DID.nrows) if state is -1: return r if time < timeTick else rmn if state is +1: return r if time > timeTick else rmx if state is 0: dmn = abs(self.getTime(rmn) - timeTick) dmc = abs(time - timeTick) dmx = abs(self.getTime(rmx) - timeTick) mna = min(dmn, dmc, dmx) if dmn == mna: return rmn if dmc == mna: return r if dmx == mna: return rmx if time > timeTick: # determine which half contains the row of timeTick rE = r tE = time else: rO = r tO = time if rO > rE: print("ffsearch algorithm failed fix it") return None def getDataType(self): """ return dtype used for numpy records.fromfile """ if not self.dtype: # generate the struct """ later the type attribute from FFDESC to generate more flexable dataType""" # TBBLSRAIU # only float were tested 12/05/13 leel keyDict = {'T': "f8", 'R*8': "f8", 'R8': "f8", 'D': "f8", 'R': "f4", 'R*4': "f4", 'R4': "f4", 'F': "f4", 'I': "i4", 'I*4': "i4", 'I4': "i4", 'L': "L", 'S': "i2", 'I*2': "i2", 'I2': "i2", 'U': "ui4", 'U*4': "ui4", 'UI': "ui4", 'A': "a1", 'B': "V"} #default - time, array floats ncols = self.FFParm["NCOLS"].value self.simple = "f8" + (ncols - 1) * ",f4" types = self.getColumnDescriptor(key="TYPE") names = self.getColumnDescriptor(key="NAME") uNames = set(names) duplicate = False if len(uNames) < len(names): duplicate = True print("FF_FILE : Duplicate column names") dup = [i for i, x in enumerate(names) if names.count(x) > 1] cNames = [x for x in names] for i in dup: cNames[i] = names[i] + str(i) # print(names) # print(dup) # quit() forms = [keyDict[type] for type in types] if forms is None: print("getDataType: types not recognized: ", types) # from the header file, fields are fO, f1 etc format = ",".join(forms) self.dtype = numpy.dtype(format) # set fields by Column Names does not work # still f0 # pair = zip(names,forms) # nameForm = [(name, format) for name, format in pair] # print(nameForm) if not duplicate: self.dtypeByName = {'names': names, 'formats': forms} self.dtype.names = names else: self.dtype.names = cNames self.dtypeByName = {'names': cNames, 'formats': forms} # print("FIELDS ", self.dtypeByName.fields) return self.dtypeByName def getSlice(self, startRow, stopRow): nrows = self.FFParm["NROWS"].value recl = self.FFParm["RECL"].value rO = max(startRow, 1) rE = min(stopRow, nrows) nr = rE - rO + 1 offset = recl * (startRow - 1) dType = self.getDataType() # format = ",".join(dType['formats']) row = self.DID.getRow() self.DID.setRow(1) d = numpy.core.records.fromfile(self.DID.file, offset=offset, formats=dType, shape=nr, byteorder=">") self.DID.setRow(row) return d # 2015 Oct 8 refactor def getError(self): # from FFINFO return self.FFInfo['ERROR_FLAG'].value def getNColumns(self): # assume old and opened return self.FFParm['NCOLS'].value def getEpoch(self): # assume old and opened return self.FFParm['EPOCH'].value # 2015 Oct 8 refactor def getErrorFlag(self): # assume old and opened return self.DID.error def getRows(self): """ current row """ # assume old and opened, number rows in file return self.FFParm['NROWS'].value def getFirst(self): """ first time """ # assume old and opened return self.FFInfo['FIRST_TIME'].value def getLast(self): """ last time """ # assume old and opened return self.FFInfo['LAST_TIME'].value def getResolution(self): """ file resolution """ if not self.resolution: if self.getRows() == 0: print(self.name, " has no data") return None sample = min(20, self.getRows()) # sometimes POS data for example doesn't even have 20 entries slice = self.getSlice(1, sample) times = slice['f0'] dt = [times[i + 1] - times[i] for i in range(0, sample - 1)] dtRaw = min(dt) dtInt = int(10000 * dtRaw) / 10000. self.resolution = dtInt return self.resolution # new -> extract data by slices def slice(self, start, stop_, Time=False): """ multiple records from start to stop time Time = False start/stop_ are records Time = True start/stop_ are time ticks """ if Time: # TODO time part not vetted nrows = self.DID.nrows startTick = start if type(start) is float else start.ticks stop_Tick = stop_ if type(stop_) is float else stop_.ticks firstRec = self.ffsearch(startTick, 0, nrows) last_Rec = self.ffsearch(stop_Tick, 0, nrows) if startTick is None or stop_Tick is None: err = 2 msg = "FF.slice: Times are out of bounds" return err, msg else: firstRec = start last_Rec = stop_ return (self.sliceByRecord(firstRec, last_Rec)) def sliceByRecord(self, startRec, stop_Rec): """ obtain multiple records span by stopRec and stop_Rec :param startPos: start record :param stoprec: stop record :return: array of record """ validSpan = True if startRec < 0 or stop_Rec < 0: validSpan = False if startRec >= stop_Rec: validSpan = False if startRec > self.DID.nrows + 1 or stop_Rec > self.DID.nrows + 1: validSpan = False if not validSpan: err = 1 msg = "FF.slice error: invalid span max rows", self.DID.nrows, startRec, stop_Rec # print("FF.slice error: invalid span max rows", self.DID.nrows) return err, msg slice = [] self.DID.setRow(startRec) for i in range(startRec, stop_Rec - 1): rec = self.DID.get() lrec = list(rec) # lrec[0] = FFTIME(rec[0], Epoch=self.FFParm["EPOCH"].value) slice.append(lrec) return slice, "OKAY" def inTimeSpan(self, timeTick, Span=0): """ timeTick in file Span : margin of tolerance """ # span, margin of error tO = self.FFInfo["FIRST_TIME"].value tE = self.FFInfo["LAST_TIME"].value return (tO < timeTick + Span) and (timeTick - Span < tE) def timeArray(self): # return time column """ file file time column """ nRow = self.FFParm["NROWS"].value if nRow == 0: print("SOMETHING WRONG") exit(1) bucket = 86400 nbuckets = int(nRow / bucket) + 1 startRec = 1 arrays = [0] * nbuckets for i in range(nbuckets): stop_Rec = min(nRow, startRec + bucket) nRec = stop_Rec - startRec + 1 slice = self.DID.sliceArray(startRec, nRec) arrays[i] = slice["time"] startRec = stop_Rec + 1 tArray = numpy.concatenate(arrays) return tArray def recordsToColumns(records): """ converts array of records to array of columns :param records: array of flatfile records :return: returns array of columns """ columns = tuple(zip(*records)) return columns def ColumnStats(columns, flag, NoTime=True): """ Get max,min, absolute max, min, sep values :param columns: tuple (time, floats....) :param columns: tuple (floats....) :param NoTime: if has no time, tuple ( floats....) :return: stats """ prefix = 1 if NoTime else 0 ncols = len(columns) + prefix f = flag * .999 # ignore precision beyond 3 digits smx = [0] * ncols smn = [0] * ncols amx = [0] * ncols amn = [0] * ncols sep = [0] * ncols # dif = [0] * (ncols + prefix) # nvalues = len(columns[0]) # diff = [0] * ncols smn[0] = amn[0] = 0 smx[0] = amx[0] = 0 sep[0] = 0.0 for i in range(1, ncols): j = i - prefix values = columns[j] if flag: vals = values[values < f] else: vals = values avalues = numpy.absolute(vals) if len(vals): smn[i] = numpy.amin(vals) smx[i] = numpy.amax(vals) amn[i] = numpy.amin(avalues) amx[i] = numpy.amax(avalues) else: # no values smn[i] = 0 smx[i] = 0 amn[i] = 0 amx[i] = 0 sep[i] = 0 continue # amx[i] = numpy.amax(avalues) # print("MIN", min(vals), numpy.amin(vals)) # print("MAX", max(vals), numpy.amax(vals)) # print("SIN", min(avalues), numpy.amin(avalues)) # print("Sax", min(avalues), numpy.amin(avalues)) if i == 1: prev = values[values < f] # diff[1] = prev difm = prev else: curr = columns[j] prev = columns[j - 1] # diff[i] = [c - p for (c,p) in zip(curr, prev) if c < f and p < f] dif = curr - prev mask = (curr > f) | (prev > f) difm = numpy.ma.masked_array(dif, mask=mask) # sep[i] = max(diff[i]) # sep[i] = max(difm) # sep[i] = numpy.amax(difm) sep[i] = abs(numpy.amax(difm)) # leel 2015 Apr 22 # print("SEP", max(difm), numpy.amax(difm)) return(smn, smx, amn, amx, sep) # Flat file routines from ff_igpp # ID ff_open -> opens file streams, reads in parameters # determins open mode argument # ID ff_close -> writes/updates header files # x DID ff_get/put -> get/put data records # x HID ff_cget/cput -> get/put header records # x OBS1 ff_buildname -> generate name pairs (name.{ffd,ffh}) # not needed # x OBS2 strsrch -> searches string within a string # use python string functions # x DHID ff_setrow/ffgetrow -> set/get the current row position for both header/data # HID ff_hget/ff_hput -> get/put column descriptor data to header file # HID ff_hgetinfo/ff_hputinfo -> get/put flatfile information from the header file # HID ff_hupdinfo -> replaces information data in the header file # x HID ff_copyabstact -> extract abstract from the header file # ??? ff_reporterror -> error report system to stderr # ??? ff_routineerror -> error report error relative to the calling routine # DID ff_bsearcherror -> binary search for record closes to a given time # x HID ff_setUnitFormat -> number characters used for unit column # x HID GetUnitLength -> calculate unit column length from the header # HID PrintFFColDesc -> print descriptor for one column # x HID PrintFFParam -> print parameters used to open the files # replaced by getParameters # x HID PrintFFInfo -> print flatfile information # replaced by getInfo # DID getstoptime -> get the actual stop time from the data file # x done if __name__ == '__main__': # F_ID = FF_ID("fftest", status=FF_STATUS.READ) # F_ID.open() # key words set the FF_Parm values here # Does not check whether recl, ncols and the Column descritors # actually match # F_ID = FF_ID("fftest", status=FF_STATUS.READ) F_ID = FF_ID("D151_1m", status=FF_STATUS.READ) F_ID.open() # read in data if F_ID.valid: print("NUMBER ROWS ", F_ID.DID.nrows) for i in range(10): record = F_ID.DID.get() print("READ IN ", record) for i in range(10): r = F_ID.ffsearch(i * 600 + 30, 1, 100) print("I ", i, "row ", r, "time", i * 600 + 30) dt = F_ID.getDataType() x = F_ID.getSlice(1, 2) F_ID.getResolution() F_ID.close() FNEW = FF_ID("new", status=FF_STATUS.WRITE, recl=20, ncols=4, epoch=FF_EPOCH.J2000, version="TESTVERSION") FNEW.open() # write data for i in range(10): rec = [i * 60, i, i * 10, i * i] tup = tuple(rec) FNEW.DID.put(tup) # update parameters and info before closing # FNEW.colDesc.setDesc(2, ["RTWOR", "unit", "source", "R", 8]) # FNEW.colDesc.setDesc(3, ["THREE", "unit", "source", "R", 8]) FNEW.setParameters(NROWS=10) FNEW.setInfo(FIRST_TIME="2000 001 Jan 01 00:00:00", LAST_TIME="2001 001 Jan 01 00:00:00", RESOLUTION="00:00:10", OWNER="WHO DO YOU KNOW", ORBITS=(3, 4), LAST_LEAP=lastLeap()) # ORBITS=(3, 4), EPOCH="Y2000") FNEW.setAbstract(["ONE", "TWO", "THREE"]) FNEW.close() """ NOTES 2013 mar 26 -> first iteration, imitate the original C code Comments The format attribute changes depending the output may be placed in the header file, which each attribute written per 72 char line set time as a class, TO_DO: have an method generated which reads a record based on the column descriptor formats Note not using the array for reading/writing since the record may be of mixed types, using the pack/unpack Later, use FF_INFO to genrate the dform and store this in the FF_DID.instance, if it exists use this add getResolution, if not defined determine dt of a flatfile and store return dt """ ============================================================================== FF_Info.py ============================================================================== from datetime import datetime from FF_Static import static from FF_Time import FFTIME, lastLeap from FF_Time import FF_EPOCH # FF INFO labels as written in the flatfile header # leel 2016/02/09 for speed there's no arglist error checking for public methods FF_INFO = static(FIRST_TIME="FIRST TIME", LAST_TIME="LAST TIME", RESOLUTION="AVERAGE INTERVAL", ERROR_FLAG="MISSING DATA FLAG", OWNER="OWNER", ORBITS="ORBIT NUMBER(S)", LAST_LEAP="LAST LEAP", APOGEE="APOGEE") # FFINFO - items are not in a specific order, nor necessarily appear # in the flat file header # FFPARM -> derived FFINFO class class _FF_TEXT: """ flatfile information as text""" # """descriptor class, manages the text attribute # saves value/text pair # uses the T2V function, if defined # uses the V2T function, if defined # formatted as in the header files # calculate the values as used in the data # (not yet figure to do epoch) # """ def __get__(self, instance, value): # print("getInfox", instance._name, instance._value, type(instance._value)) if instance._format is not None: if type(instance._value) is not list: return instance._format % instance._value else: info = instance._format % tuple(instance._value) return info return instance._info def __set__(self, instance, value): instance._save = True if type(value) == str: instance._info = value if instance.T2V is None: instance._value = instance._type(value) else: instance.T2V(instance) else: instance._value = value if instance.V2T is None: instance._info = str(value) else: instance.V2T(instance) # print(instance._name, value, instance._info, instance._value) class _FF_VALUE: """ flatfile information as values """ # """descriptor class, manages the value attribute # saves value/text pair # uses the T2V function, if defined # uses the V2T function, if defined # formatted as in the header files # calculate the values as used in the data # (not yet figure to do epoch) # """ def __get__(self, instance, value): # print("getValue", instance._name, instance._value) return instance._value def __set__(self, instance, value): # print("setVALUE") instance._save = True if type(value) == str: instance._info = value if instance.T2V is None: instance._value = instance._type(value) else: instance.T2V(instance) else: instance._value = value if instance.V2T is None: instance._info = str(value) else: instance.V2T(instance) # print(instance._name, value, instance._info, instance._value) class FFINFO: """flafile information: start/stop time error flag resolution owner orbit numbers apogee numbers last leap second usage : info = FFINFO(key, info/value, TYPE=str) key = one of FF_FFINFO: FIRST_TIME, LAST_TIME, ERROR_FLAG, RESOLUTION, OWNER, ORBITS, APOGEE, LAST_LEAP info/value => info/value pair Info -> formatted string as written from the flatfile TYPE -> value type """ # leel 2016/02/09 add exists def __init__(self, name, value, TYPE=str, FORMAT=None, SAVE=False): """ usage name FFINFO.key , value, float or string """ #check if name is in the table status = name in FFINFO._AttList if status is False: self.V2T, self.T2V = None, None else: self.V2T, self.T2V = FFINFO._table[name] self._name = name self._type = TYPE self._save = SAVE # whether to record in the flatfile, used in FF_File.getInfo self._format = FORMAT # user defined format (for FFPARM) self.epoch = FF_EPOCH.Y1966 if type(value) == str: self._info = value if self.T2V is None: self._value = self._type(value) else: self.T2V(self) else: self._value = value if self.V2T is None: self._info = str(value) else: self.V2T(self) def __str__(self): """ return name, info, value, type, save """ return str([self._name, self._info, self._value, self._type, self._save]) # private functions, text/value converters: # First/Last Time, OWNER, ERROR_FLAG, AVERAGE INTERVAL, ORBIT NUMBERS def _orbit2Info(self): """ value is tuple pair""" self._info = str(self._value[0]) + " - " + str(self._value[1]) def _info2Orbit(self): self._value = tuple(int(orbit) for orbit in self._info.split(" - ")) def setOrbit(self, value): """ setOrbit("x - y") or setOrbit((x,y)) """ if type(value) == str: self._value = tuple(int(orbit) for orbit in value.split(" - ")) if type(value) == tuple: self._value = value self._save = True def _lastLeap2Info(self): self._info = str(lastLeap()) def _info2LastLeap(self): self._value = lastLeap() def setLastLeap(self): # instance method, not class if self._info: set._value = lastLeap() set._info = lastLeap() set._save = True def _apogee2Info(self): """ value is tuple pair""" self._info = str(self._value[0]) + " - " + str(self._value[1]) # self._info = self._value def _info2Apogee(self): # print("info to Apogee", self._info, type(self._info)) # if self._info: # self._value = str(self._info) self._value = tuple(int(apogee) for apogee in self._info.split(" - ")) def setApogee(self, apogee): """ setApogee('x - y') setApogee((x, y)) """ if type(apogee) == str: self._value = tuple(int(apogee) for apogee in apogee.split(" - ")) if type(apogee) == tuple: self._value = apogee self._save = True # self.apogee = int(apogee) # self._save = True def setEpoch(self, epoch): # instance method, not class self.epoch = epoch if self._info: t = FFTIME(self._info, Epoch=self.epoch) self._value = t.tick def _time2Info(self): t = FFTIME(self._value, Epoch=self.epoch) self._info = t.UTC # print(" time2Info ", self._info) def _info2Time(self): t = FFTIME(self._info.replace(" UTC", ""), Epoch=self.epoch) self._value = t.tick def _res2Info(self): # format "UNKNOWN, HIGH RESOLUTION, or HH:MM:SS # value <0, value == 0 , value > 0 self._info = "UNKNOWN" if self._value == 0: self._info = "HIGH RESOLUTION" if self._value > 0: hour = int(self._value // 3600) left = self._value % 3600 minute = int(left // 60) sec = int(left % 60) ms = int((self._value - int(self._value)) * 1000) string = "%2d:%2d:%2d.%3d" % (hour, minute, sec, ms) self._info = string.replace(" ", "0") def _info2Res(self): value = -1 if self._info is "HIGH RESOLUTION": value = 0 try: it = list(map(float, self._info.split(":"))) if it[0] < 24 and it[1] < 60 and it[2] < 60: value = it[0] * 3600 + it[1] * 60 + it[2] except: value = -1 self._value = value def _flag2Info(self): self._info = format(self._value, "10.5e") def _info2Flag(self): self._value = float(self._info) _AttList = ["FIRST_TIME", "LAST_TIME", "RESOLUTION", "ERROR_FLAG", "OWNER", "ORBITS", "LAST_LEAP", "APOGEE"] _V2TList = [_time2Info, _time2Info, _res2Info, _flag2Info, None, _orbit2Info, _lastLeap2Info, _apogee2Info] _V2TList = [_time2Info, _time2Info, _res2Info, _flag2Info, None, _orbit2Info, _lastLeap2Info, _apogee2Info] _T2VList = [_info2Time, _info2Time, _info2Res, _info2Flag, None, _info2Orbit, _info2LastLeap, _info2Apogee] t = list(zip(_AttList, _V2TList, _T2VList)) _table = dict([att, (v2t, t2v)] for att, v2t, t2v in t) value = _FF_VALUE() info = _FF_TEXT() class FFPARM(FFINFO): """ Flatfile parameters: creation date, number columns, record length, number rows, operating system, epoch usage : parm = FFPARM(key, info/value, TYPE=str) key = info label, info/value => info/value pair Info -> formatted string on written from the flatfile """ # make a dictionary for "FIRST_TIME,LAST_TIME,RESOLUTION,FLAG,ORBITS) # key (T2V,V2T) def __init__(self, name, value, TYPE=str, FORMAT=None, SAVE=True): """ name : string CDATE,RECL,NCOLS,NROWS,OPSYS,EPOCH value: parameter value TYPE: how the value is saved (value or formatted string) FORMAT: user defined format storing the value SAVE: boolean record parameter to flatfile header """ #check if name is in the table status = name in FFPARM._AttList if status is False: self.V2T, self.T2V = None, None else: self.V2T, self.T2V = FFPARM._table[name] self._name = name self._type = TYPE self._save = SAVE # whether to record in the flatfile self._format = FORMAT # user defined format isListofStrings = type(value) is str if type(value) is list: isListofStrings = type(value[0]) is str isInfo = (type(value) is str) or isListofStrings if isInfo is True: self._info = value if self.T2V is None: self._value = self._type(value) else: self.T2V(self) else: self._value = value if self.V2T is None: self._info = str(value) else: self._info = str(value) # initialize _info self.V2T(self) def __str__(self): return str([self._name, self._info, self._value, self._type, self._save]) # private functions, converters for FF_PARM parameters: # CDATE -> info may be string, or list[2], value is tuple(2) def _Text2CDATE(self): # if value is not a string calcuated the string # cdate may include update # if value == -1 , update, keep cdate, add update # if value == 0 , change cdate self._value = [self._info, None] if type(self._info) is str: if "UPDATE" in self._info: dates = self._info.split("UPDATE =") self._value = [date.strip() for date in dates] if type(self._info) is list: self._value = self._info def _CDATE2Text(self): if self._value is str: self._value = [self._value, None] dateTime = datetime.now().strftime('%Y %j %b %d %H:%M:%S') if self._value == -1: cdate = self._info[0] if type(self._info) is list else self._info self._value = [cdate, dateTime] if type(self._info) == str: self._info = [self._info, dateTime] if type(self._info) == list: self._info[1] = dateTime else: if self._value == 0: self._value = [dateTime, None] self._info = dateTime _AttList = ["DATA", "CDATE", "RECL", "NCOLS", "NROWS", "OPSYS", "EPOCH"] _V2TList = [None, _CDATE2Text, None, None, None, None, None] _T2VList = [None, _Text2CDATE, None, None, None, None, None] _t = list(zip(_AttList, _V2TList, _T2VList)) _table = dict([att, (v2t, t2v)] for att, v2t, t2v in _t) value = _FF_VALUE() info = _FF_TEXT() if __name__ == '__main__': # need to write examples P = FFINFO("RESOLUTION", "10:10:10.555", TYPE=float) P = FFPARM("CDATE", "2001 102 Apr 12 12:55:18") P = FFPARM("CDATE", 0) print("NEW TIME HERE", P) P = FFPARM("CDATE", ["2002 102 Apr 12 12:55:18", "2003 102 Apr 12 13:44:18"]) print(P) P.value = -1 print("UPDATE HERE", P) print(P) print("RESULTS ", P.value) P.value = "50" print("PRINT OUT") print(P.value, P.info) # change FF_SetInfo to just FF_Info, descriptor class ============================================================================= FF_Static.py ============================================================================= # $Id: FF_Static.py xxx 2013 Jul 18 leel $ # Author: Louise Lee # CopyRight: UCLA IGPP 2013 """ C enum emulator for FF_File.py, written before python enum support """ def enum(names): """create C enum : name (string) items delimited by commas.""" return type('enum', (), dict(map(reversed, enumerate( names.replace(',', ' ').split())), __slots__=()))() def static(**enums): """static class usage : const = static (key=value1, key=value2, key=value3) """ s = "" for key, val in enums.items(): s = s + str(key) + ":" + str(val) + ", " S = s[:-2] def __str__(self): return S def __listAtt__(self): return[item for item in dir(self) if item[:2] != "__"] def __listVal__(self): return[getattr(self, item) for item in dir(self) if item[:2] != "__"] def __findAtt__(self, key): list = self.__listAtt__() for item in list: if getattr(self, item) == key: return item return None D = dict(__slots__=(), __str__=__str__, __listAtt__=__listAtt__, __listVal__=__listVal__, __findAtt__=__findAtt__, **enums) return type('static', (), D)() # () force to be read only def enum_base(t, **enums): '''enums with a base class, values can be reset ''' T = type('Enum', (t,), {}) for key, val in enums.items(): setattr(T, key, T(val)) # try using tuples and do the type last return T def splitTextBySize(text, size): """splits string to strings of size""" strings = [text[i:i + size] for i in range(0, len(text), size)] return (strings) def firstSansSubstring(strings, substring): """ returns first string which does not contain substring""" return next(i for i, string in enumerate(strings) if not substring in string) def extractKEYS(seq, length): """ extract attribute value pairs from attribute = value strings""" KEYS = [seq[i:i + length].strip().split(' = ') for i in range(0, len(seq), length)] keys = ([K[0].strip(), K[1].strip()] for K in KEYS) return dict(keys) def selectionFromList(keys, selection): """ return (list) selected indices in keys for selection keys -> list keys -> list, selected items """ select_set = set(selection) return [i for i, v in enumerate(keys) if v in select_set] ============================================================================== FF_Time.py ============================================================================== # $Id: FF_Time.py xxxx 2014-Apr-14 hh:mm:ssz leel $ # Author: Louise Lee # Copyright: UCLA IGPP 2014 """ Time representation: - FFTime, time representation Attributes - utc UTC format - date timestamp with leapseconds ignored - tick time ticks relative to Epoch - epoch epoch reference """ # meta classes # BASE is at 1970 # _FF_DATE - ticks date format (NOT UTC) # _FF_INT8 - 8 integer list # _FF_TICK - ticks relative to epoch # _FF_UTC - Time int UTC import os import sys import inspect import time import datetime import pytz #from sysUtilities import getResourcePath #just including this in FF lib, this shouldnt have any dependencies from FF_Static import static FF_EPOCH = static(Y1970="Y1970", Y1966="Y1966", Y2000="Y2000", J2000="J2000", Y1958="1958") FF_LEAPFILE = "tai-utc.dat" # FF_ERROR = 0.0005 def inTimeSpan(tick, tO, tE): """ checks whether t is within tO, tE within tolerance of FF_ERROR, assumes same epoch :param t: time to check :param tO: start time :param tE: stop time :return: boolean """ return (tO - tick) > FF_ERROR or (tE - tick) < FF_ERROR def getResourcePath(): if hasattr(sys, 'frozen'): ePath = os.path.dirname(sys.executable) else: frame = inspect.stack()[1] module = inspect.getmodule(frame[0]) filename = module.__file__ ePath = os.path.dirname(os.path.abspath(filename)) if sys.platform == 'win32': path = os.path.join(ePath, "Resources") else: path = os.path.join(ePath, "..", "Resources") return path # ePath = getHome() return ePath def leapFile(): """ Get the default leapfile path/name. Look for the leapfile in the directory of the caller, the directory specified by the KERNPATH environment variable, and finally in the Resources directory. """ # Get the path of the caller. if hasattr(sys, 'frozen'): ePath = os.path.dirname(sys.executable) else: frame = inspect.stack()[1] module = inspect.getmodule(frame[0]) filename = module.__file__ ePath = os.path.dirname(os.path.abspath(filename)) # Look in the directory of the caller. fName = os.path.join(ePath, FF_LEAPFILE) if os.path.isfile(fName): return fName # Look in the directory specified by KERNPATH. path = os.getenv("KERNPATH") if path: fName = os.path.join(path, FF_LEAPFILE) if os.path.isfile(fName): return fName # Look in the Resources directory. rPath = getResourcePath() fName = os.path.join(rPath, FF_LEAPFILE) if os.path.isfile(fName): return fName return fName class _taiutc: """ tai-utc.dat entry, plus correlated time ticks """ def __init__(self, year=1966, mon="Jan", day=1, jd=0, dt=0, off=0, scale=0): self.year = int(year) self.mon = mon[:1].upper() + mon[1:].lower() self.mm = _taiutc.MONDict[self.mon] self.day = int(day) self.jd = float(jd) # julian day self.dt = float(dt) # leap second offset self.off = float(off) # julian day offset self.scale = float(scale) # scale factor self.datetime = datetime.datetime(self.year, self.mm, self.day, 0, 0, 0, 0, pytz.UTC) dt = self.datetime - _taiutc._TBASE self.date = self.datetime.strftime("%d/%m/%y %H:%M:%S.%f") self.btime = dt.total_seconds() # Ticks from base time da = self.datetime - _taiutc.ABASE self.atime = da.total_seconds() # Ticks from Jan 1, 2000 def __str__(self): """ time componets """ tup = (self.year, self.mon, self.day, self.jd, self.dt, self.off, self.scale, self.atime, self.date) s = str(tup) return s _TBASE = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, pytz.UTC) ABASE = datetime.datetime(2000, 1, 1, 0, 0, 0, 0, pytz.UTC) MONDict = {"Non": 0, "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12} class _FF_TICK: # aka T2UTC """ tick offset """ def __get__(self, instance, value): return instance._tick def __set__(self, instance, value): instance._base = value + FFTIME._TOFFS[instance.epoch] instance._tick = value instance._base2List() # set _list instance._date = FFTIME.list2Date(instance._list) instance._tick2UTC() class _FF_UTC: # aka UTC2{T,IT} """ UTC time """ def __get__(self, instance, value): return instance._utc def __set__(self, instance, value): instance._utc = value instance._UTC2Tick() # set _tick, _base, _iUTC instance._base2List() # set _list instance._date = FFTIME.list2Date(instance._list) # Epochs 1970 and 1966 leapseconds are ignored ignoreLeap = set([FF_EPOCH.Y1970, FF_EPOCH.Y1966]) # print(" UTC IS ", instance._utc, instance._iUTC) if instance.epoch in ignoreLeap: if (instance._iUTC[6] == 60): instance._iUTC[6] = 59 instance._utc = instance._date class _FF_DATE: # only accept UTC STRINGS """ time in date format """ def __get__(self, instance, value): return instance._date def __set__(self, instance, value): print("Warning date cannot be modified") # convert offsets to other forms class _FF_DATELIST: # aka it2UTC """ time as integer list """ def __get__(self, instance, value): return instance._list def __set__(self, instance, value): it = value[:] instance._date = FFTIME.list2Date(it) dt = datetime.datetime(it[0], it[2], it[3], it[4], it[5], it[6], it[7], pytz.UTC) tick = (dt - FFTIME._TBASE).total_seconds() instance._tick = tick # diff = tick - FFTIME._TOFFS[instance.epoch] instance._tick = tick - FFTIME._TOFFS[instance.epoch] instance._base = tick instance._tick2UTC() class FFTIME: """ Time : supported epochs: Base Time : Jan 1, 1970 00:00:00 no leapseconds Cline Time : Jan 1, 1966 00:00:00 no leapseconds Y2000 Time : Jan 1, 2000 00:00:00 leapseconds J2000 Time : Jan 1, 2000 12:00:00 leapseconds """ def __init__(self, value, Epoch=FF_EPOCH.Y2000, Verbose=False): """ value -> tick or UTC epoch -> one of Y1966, Y2000, J2000 Verbose -> output value in tick or date formats Does not check format for UTC time UTC = YYYY DOY Mon DD HH:MM:SS.sss """ self.epoch = Epoch self._base = time.gmtime() # system Epoch (1970) offset self._tick = 0 # Epoch ticks self._date = "2000 001 Jan 1 00:00:00" # Epoch timestamp self._list = [2000, 1, 1, 1, 0, 0, 0, 0] # Epoch time as list self._utc = "2000 001 Jan 1 00:00:00" # UTC form if type(value) is str: # assume date is UTC self._utc = value self._UTC2Tick() # set _tick, _base, _iUTC self._base2List() # set _list self._date = FFTIME.list2Date(self._list) if Verbose: print("INPUT", value, "OUTPUT DATE", self._date, self._tick) if FFTIME._isNumeric(value): self._base = value + FFTIME._TOFFS[self.epoch] self._tick = value self._base2List() # set _list self._date = FFTIME.list2Date(self._list) self._tick2UTC() if Verbose: print("INPUT", value, "OUTPUT", self._date, "UTC", self._utc) def __str__(self): """ tick, date, UTC """ string = str(self.tick) + " DATE " + self.date + " UTC " + self.UTC return string def _base2List(self): """ convert base tick to list """ # it appears to be a bound at -43200,1969 12 31 120 0 for PCs # t = time.gmtime(self._base) t = _GMTIME(self._base) ms = int(round((self._base - int(self._base)) * 1000)) sec = t.tm_sec if ms < 0: # should not be negative ms += 1000 sec = t.tm_sec - 1 self._list = [t.tm_year, t.tm_yday, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, sec, ms] def _int2Off(self): pass def _int2UTC(self): pass def _tick2UTC(self): """ convert Epoch ticks to UTC base and Cline ignores leapseconds """ if self.epoch == FF_EPOCH.Y2000 or self.epoch == FF_EPOCH.J2000: self._utc = FFTIME.tick2UTC(self._tick, self._list, self.epoch) else: # for leap second unaware self._utc = self._date def _EpochTick2UTC(self): # may be obsolete """ convert leapsecond aware epoch ticks to UTC """ # calibrate ticks to system epoch (1970) # base still has leapseconds base = self._tick + FFTIME._TOFFS[self.epoch] + 1 it = self._list index = FFTIME._getLeapEntry(base) entry = FFTIME._TLEAP[index] leap = entry.dt # base -> includes leapseconds # caluculate MJD with leapseconds # remove the leapseconds # calculate MJD with new time if (index < 15): MJD = FFTIME.list2MJD(it) adjust = (MJD - entry.off) * entry.scale newt = base - leap - adjust list = FFTIME.tick2List(newt) # repeat, second list has leapseconds removed MJD = FFTIME.list2MJD(list) adjust = (MJD - entry.off) * entry.scale leap = entry.dt + adjust # adjust base sans leapseconds tbase = base - leap # convert to list diff = tbase - entry.btime addone = 0 if diff >= 1: addone = -1 list = FFTIME.tick2List(tbase + addone) if diff >= 0 and diff < 1: list[6] = 60 self._utc = FFTIME.list2Date(list) def _UTC2BaseTick(self): """ converts UTC to offset to system epoch base offset is not leapsecond adjusted """ value = self._utc.replace(':', ' ').replace('UTC', '').replace('.', ' ').split() month = value[2] Month = month[0].upper() + month[1:].lower() mon = FFTIME.MONDict[Month] value[2] = str(mon) it = [int(i) for i in value] ext = [0] * 6 it.extend(ext) # pad for truncated items if(it[0] < 50): it[0] = it[0] + 2000 if(it[0] < 99): it[0] = it[0] + 1900 sec = it[6] if sec == 60: # leapsecond sec = 59 dt = datetime.datetime(it[0], it[2], it[3], it[4], it[5], sec, 1000 * it[7], pytz.UTC) base = (dt - FFTIME._TBASE).total_seconds() self._iUTC = it return base def _UTC2Tick(self): """ UTC to time offsets, base and epoch""" if self.epoch == FF_EPOCH.Y2000 or self.epoch == FF_EPOCH.J2000: self._UTC2EpochTick() else: self._base = self._UTC2BaseTick() self._tick = self._base - FFTIME._TOFFS[self.epoch] def _UTC2EpochTick(self): """ UTC to time offsets for Y2000, J2000 """ value = self._utc.replace(':', ' ').replace('.', ' ').split() addleap = 0 if int(value[6]) == 60: # leapsecond addleap = 1 base = self._UTC2BaseTick() t = base index = FFTIME._getLeapEntry(t) entry = FFTIME._TLEAP[index] # between Jan 1, 1961 to Jan 1, 1972 leapseconds are fractions if (index < 15): MJD = FFTIME.list2MJD(self._iUTC) adjust = (MJD - entry.off) * entry.scale leap = entry.dt + adjust else: leap = entry.dt self._tick = t + leap - FFTIME._TOFFS[self.epoch] + addleap self._base = t + leap + addleap def leap(self): """ return leapseond """ leap = 0 if self.epoch == FF_EPOCH.Y2000 or self.epoch == FF_EPOCH.J2000: To = FFTIME(self._date) Tu = FFTIME(self.UTC) leap = To._tick - Tu._tick return leap def _isNumeric(value): try: float(value) except ValueError: return False return True def _fillOffsets(): # base, cline Time, Y2000, J2000 offsets = {"Y1970": 0, "Y1966": 0, "Y2000": 0, "J2000": 0} UT = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, pytz.UTC) CT = datetime.datetime(1966, 1, 1, 0, 0, 0, 0, pytz.UTC) YT = datetime.datetime(2000, 1, 1, 0, 0, 0, 0, pytz.UTC) JT = datetime.datetime(2000, 1, 1, 12, 0, 0, 0, pytz.UTC) offsets["Y1966"] = (CT - UT).total_seconds() offsets["Y2000"] = (YT - UT).total_seconds() offsets["J2000"] = (JT - UT).total_seconds() return offsets def list2Date(it): """ converts time from integer list to date""" list = it[:] mon = FFTIME.MONLIST[it[2]] list[2] = mon date = FFTIME._TFORM % tuple(list) return date def date2List(date): """ converts date to an integer list""" value = date.replace(':', ' ').replace('.', ' ').split() month = value[2] Month = month[0].upper() + month[1:].lower() mon = FFTIME.MONDict[Month] value[2] = str(mon) it = [int(i) for i in value] return it def tick2List(ticks): """ reformat ticks to integer list """ # t = time.gmtime(ticks) t = _GMTIME(ticks) ms = int(round((ticks - int(ticks)) * 1000)) list = [t.tm_year, t.tm_yday, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, ms] return list def tick2UTC(tick, list, epoch): """ convert leapsecond aware epoch ticks to UTC """ # calibrate ticks to system epoch (1970) # base still has leapseconds if epoch == FF_EPOCH.Y1970 or epoch == FF_EPOCH.Y1966: utc = FFTIME.list2Date(list) return utc base = tick + FFTIME._TOFFS[epoch] + 1 it = list[:] index = FFTIME._getLeapEntry(base) entry = FFTIME._TLEAP[index] leap = entry.dt # base -> includes leapseconds # caluculate MJD with leapseconds # remove the leapseconds # calculate MJD with new time if (index < 15): MJD = FFTIME.list2MJD(it) adjust = (MJD - entry.off) * entry.scale newt = base - leap - adjust list = FFTIME.tick2List(newt) # repeat, second list has leapseconds removed MJD = FFTIME.list2MJD(list) adjust = (MJD - entry.off) * entry.scale leap = entry.dt + adjust print(" ADJUST") # adjust base sans leapseconds tbase = base - leap # convert to list diff = tbase - entry.btime addone = 0 if diff >= 0: addone = -1 list = FFTIME.tick2List(tbase + addone) if diff >= 0 and diff < 1: list[6] = 60 utc = FFTIME.list2Date(list) return utc def list2MJD(it): """ reformat list to MJD """ DaysPerYear = 365 JD2000 = 2451544.5 # Julian date for Jan 1, 2000 JD2MJD = 2451544.5 # JD - MJD year = it[0] DOY = it[1] if (year <= 99): year = year + 1900 dy = 2000 - year JD = JD2000 - (dy * DaysPerYear) nleapdays = dy // 4 # number of 4 year intervals n100year = dy // 100 # number of 100 year intervals n1000year = dy // 1000 # number of 1000 year intervals JD = JD - nleapdays + n100year - n1000year + DOY - 1 MJD = JD - JD2MJD return MJD def _fillLeapTable(): """ generate Leapsecond table """ import numpy as np fname = leapFile() fh = open(fname, mode='rt') #print("Opening", fname) if not fh: print("Unable to read leapsecond file", fname, ". Exiting.") exit(1) text = fh.readlines() nrec = len(text) ENTRYS = [0] * (nrec + 1) ENTRYS[0] = _taiutc() for line, i in zip(text, np.arange(nrec)): text = line.replace(')', ' ').split() if len(text) == 0: print("Found empty line,", i, ". Exiting.") break e = _taiutc(year=text[0], mon=text[1], day=text[2], jd=text[4], dt=text[6], off=text[11], scale=text[13][0:9]) ENTRYS[i + 1] = e ENTRYS[0].atime = ENTRYS[0].atime - .001 return ENTRYS def _getLeapEntry(t): """ find index of record in leapsecond table """ leap = None # record index last = len(FFTIME._TLEAP) - 1 if (t < FFTIME._TLEAP[0].btime): # before first leapseconds leap = 0 if (t >= FFTIME._TLEAP[last].btime): # after last entry leap = last if (leap is None): # find entry within table index = [i for i, item in enumerate(FFTIME._TLEAP) if t < item.btime] # entry after t leap = index[0] - 1 return leap # YYYY DOY MON mm HH:MM:SS.sss # YEAR = 0 # DOY = 1 # MONTH = 2 # DOM = 3 # HOUR = 4 # MINUTE = 5 # SECOND = 7 # MSEC = 7 _TFORM = "%04d %03d %3s %02d %02d:%02d:%02d.%03d" _TBASE = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, pytz.UTC) _TLEAP = _fillLeapTable() _TOFFS = _fillOffsets() MONLIST = ["Non", "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] MONDict = {"Non": 0, "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12} date = _FF_DATE() tick = _FF_TICK() UTC = _FF_UTC() list = _FF_DATELIST() def _GMTIME_MS(base): """ gmt time for windows systems""" if base >= 0: t = time.gmtime(base) return t DAY = int(24 * 60 * 60) YEAR = int(365 * DAY) b = -base nyear = int(b / YEAR) diff = nyear * YEAR + base if nyear != 0: addDay = int((nyear / 5) + 1) diff = diff + DAY * addDay year = 1969 - nyear t = time.gmtime(diff) if (nyear != 0): if (t.tm_yday <= addDay + 1): year = year + 1 T = _SudoTime(year, t) return T class _SudoTime(): """ create struct_time to support data < 1970 """ def __init__(self, year, time): self.tm_year = year self.tm_mon = time.tm_mon self.tm_mday = time.tm_mday self.tm_hour = time.tm_hour self.tm_min = time.tm_min self.tm_sec = time.tm_sec self.tm_wday = time.tm_wday self.tm_yday = time.tm_yday self.tm_isdst = time.tm_isdst def __str__(self): tup = (self.tm_year, self.tm_mon, self.tm_mday, self.tm_hour, self.tm_min, self.tm_sec, self.tm_wday, self.tm_yday, self.tm_isdst) s = "SUDOTIME " + str(tup) return s def _GMTIME_UNIX(base): t = time.gmtime(base) return t _GMTIME = _GMTIME_MS if sys.platform == 'win32' else _GMTIME_UNIX def lastLeap(Date=True): """ last known leapsecond Date True -> return the date Date False-> return the leapsecond """ if Date: return FFTIME._TLEAP[-1].date else: return FFTIME._TLEAP[-1] def leapStats(): """ display leapfile contents """ fname = leapFile() l = len(FFTIME._TLEAP) - 1 last = FFTIME._TLEAP[l].date print("LEAP TABLE : " + fname) print("LAST ENTRY : " + last) print("LEAP SECOND: " + str(FFTIME._TLEAP[l].dt)) return (leapFile() + ": last entry " + last) if __name__ == '__main__': # dts = spt.doy2date([2002] * 4, range(186, 190), dtobj=True) # print(dts) # dts = spt.Ticktock(dts, 'UTC') # print("UTC ", dts.DOY) # x = spt.Ticktock([2452331.0142361112, 2452332.0142361112], 'JD') # x = spt.Ticktock([2452331.0142361112, 2452332.0142361112], 'JD') # x.ISO # x.DOY # t = FFTIME("2000 001 Jan 01 00:00:00.000") # s = FFTIME(0) # s = FFTIME(0, Epoch="Y1970") # s = FFTIME(0, Epoch="J2000") # a = [1, 4, 7, 9] # k = 3 # x = [i for i, item in enumerate(a) if k < a[i]] # print(k, a, x) # k = 4 # x = [i for i, item in enumerate(a) if k < a[i]] # print(k, a, x) # k = 10 # x = [i for i, item in enumerate(a) if k < a[i]] # print(k, a, x) a = (FF_EPOCH.Y1970, FF_EPOCH.Y1966, FF_EPOCH.Y2000, FF_EPOCH.J2000) # a = (FF_EPOCH.Y1970, FF_EPOCH.Y2000) tick = [0, 1, 2, 3] for i in a: print(i) # s = FFTIME("1976 002 Jan 02 00:00:31.000", Epoch=i) s = FFTIME("2005 365 Dec 31 23:59:59.000", Epoch=i) tick[0] = s.tick print(s) s = FFTIME("2005 365 Dec 31 23:59:60.000", Epoch=i) tick[1] = s.tick print(s) s = FFTIME("2006 001 Jan 01 00:00:00.000", Epoch=i) tick[2] = s.tick print(s) s = FFTIME("2006 001 Jan 01 00:00:01.000", Epoch=i) tick[3] = s.tick print(s) s = FFTIME(tick[0], Epoch=i) print("TICK ", s) s = FFTIME(tick[1], Epoch=i) print("TICK ", s) s = FFTIME(tick[2], Epoch=i) print("TICK ", s) s = FFTIME(tick[3], Epoch=i) s.tick = 0 s.UTC = "2005 365 Dec 31 23:59:60.000" l = s.list s.list = l print(s) _GMTIME(0) print("LEAPSTATS") leapStats() # print("GO") # t = FFTIME(1.1003) # t._date2List() # print(t.time) # print(t.date) # print("HELLO") # print(FFTIME.MONDict["Jan"]) # time = spt # print(time) ============================================================================== FFCreator.py ============================================================================== import sys sys.path.insert(0, 'ffPy') import FF_File from FF_Time import FFTIME, leapFile from FF_File import FF_EPOCH, FF_ID, FF_STATUS, _FF_BASE from FF_DESC import FFDESC import numpy as np def createFF(name, times, dta, labels, units, sources, epoch): # Initialize new flat file's name, record shape, and epoch nCol = len(labels) + 1 nRows = len(times) recl = 4 + nCol * 4 newFF = FF_ID(name, status=FF_STATUS.WRITE, recl=recl, ncols=nCol, epoch=epoch, version=' MarsPy/MagPy v.2019') newFF.open() newFF.setParameters(NROWS=nRows) # Set the flat file's resolution and start/end times strtTime = FFTIME(times[0], Epoch=epoch) endTime = FFTIME(times[-1], Epoch=epoch) resolution = times[1]-times[0] res = FFTIME(resolution, Epoch=epoch) newFF.setInfo(FIRST_TIME=strtTime.UTC, LAST_TIME=endTime.UTC, RESOLUTION=res.UTC[-12:], OWNER='IGPP/UCLA') # Update the column headers setFFDescriptors(newFF, labels, sources, units) # Write data and times to flat file writeDataToFF(newFF, dta, times) # Close file before exiting newFF.close() return newFF def setFFDescriptors(ff, labels, sources, units): # Writes the header information to the flat file header = ' # NAME UNITS SOURCE TYPE LOC' ncol = len(labels) + 1 # Add 1 for time DESC = FFDESC(ncol, header) desc = ['TIME', 'SEC', '', 'T', 0] DESC.setDesc(1, desc) for i in range(2, ncol + 1): desc[0] = labels[i-2] desc[1] = units[i-2] desc[2] = sources[i-2] desc[3] = 'R' desc[4] = (i-1) * 4 + 4 DESC.setDesc(i, desc) ff.colDesc = DESC def writeDataToFF(ff, dta, times): ncol = len(dta[0]) dt = [('time', '>d'), ('data', '>f4', (ncol))] nrows = len(times) bytesToWrite = np.empty(nrows, dtype=dt) for i in range(nrows): bytesToWrite[i] = times[i], dta[i] ff.DID.file.write(bytesToWrite)