Newer
Older
TraceWin distribution file
Class afterwards hold the following
dictionary items:
- x [m]
- xp [rad]
- y [m]
- yp [rad]
- phi [rad]
- E [MeV] (kinetic energy)
"""
def __init__(self, filename=None, freq=352.21, mass=938.272, Ib=0.0):
self._columns = ["x", "xp", "y", "yp", "phi", "E"]
if filename:
# read in the file..
self._readBinaryFile()
else:
import numpy
def append(self, x=0.0, xp=0.0, y=0.0, yp=0.0, E=0.0, phi=0.0):
Append one particle to the distribution
- Kinetic Energy in MeV
- x,y in m
- xp,yp in rad
- phi in rad
import numpy
self._data = numpy.append(self._data, [[x, xp, y, yp, phi, E]], 0)
self.Np += 1
Append a matrix of particle vectors.
Matrix on form 6xN, where N is number of particles.
Each row should hold [x,xp,y,yp,phi,E]
Units m,rad, MeV
import numpy
self._data = numpy.append(self._data, array, 0)
self.Np += len(array)
def combine_dst(self, other):
Appends the particles from another dst object to this one
if abs(self.mass - other.mass) > 1e-5:
raise ValueError("Adding two distributions with differing mass: {} and {}".format(self.mass, other.mass))
if abs(self.freq - other.freq) > 1e-5:
raise ValueError("You are trying to add two distributions with differing freq: {} and {}".format(self.freq, other.freq))
self.append_many(other._data)
self.Ib = (self.Ib * self.Np + other.Ib * other.Np) / (self.Np + other.Np)
def remove(self, i=None):
Removes all particles from the distribution, or the line specified by i
import numpy
self._data = numpy.delete(self._data, numpy.s_[:], 0)
self.Np = 0
self._data = numpy.delete(self._data, i, 0)
self.Np -= 1
i16 = numpy.int16
i32 = numpy.int32
f64 = numpy.float64
Header_type = numpy.dtype([("dummy12", i16), ("Np", i32), ("Ib", f64), ("freq", f64), ("dummy3", i8)])
Header = numpy.fromfile(fin, dtype=Header_type, count=1)
self.Np = Header["Np"][0]
self.Ib = Header["Ib"][0]
self.freq = Header["freq"][0]
# Some toutatis distributions has an undocumented 7th line of 0's
Table = numpy.fromfile(fin, dtype=numpy.float64, count=self.Np * 7 + 1)
if len(Table) == self.Np * 7 + 1:
elif len(Table) == self.Np * 6 + 1: # this is true in most cases
raise ValueError("Incorrect table dimensions found:", len(Table))
self._data[:, 0] *= 1e-2
self._data[:, 2] *= 1e-2
return self._columns[:] + self._indirect_columns[:]
# makes the class function as a dictionary
# e.g. dst['x'] returns the x array..
if key == "r":
return numpy.sqrt(self["x"] ** 2 + self["y"] ** 2)
elif key == "rp":
return numpy.sqrt(self["xp"] ** 2 + self["yp"] ** 2)
else:
i = self._columns.index(key)
return self._data[:, i]
except ValueError:
raise KeyError(f"Unknown key '{key}', available keys: {self.keys()}")
i = self._columns.index(key)
self._data[:, i] = value
except ValueError:
raise KeyError(f"Unknown key '{key}', available keys: {self.keys()}")
def save(self, filename, toutatis=False):
Save the distribution file
so it can be read by TraceWin again
:param filename: Name of file
:param toutatis: Include 7th column of zeros
fout = open(filename, "wb")
fout.write(pack("b", 125))
fout.write(pack("b", 100))
fout.write(pack("i", self.Np))
fout.write(pack("d", self.Ib))
fout.write(pack("d", self.freq))
fout.write(pack("b", 125))
if toutatis and data.shape[1] == 6:
data = numpy.append(data, numpy.zeros((len(data), 1)), 1)
elif not toutatis and data.shape[1] == 7:
data = data[:, :-1]
fout.write(pack("{}d".format(len(data)), *data))
def subplot(self, index, x, y=None, nb=100, mask=None, xlim=None, ylim=None):
Create a subplot histogram similar to TraceWin.
Example::
import numpy as np
from ess import TraceWin
from matplotlib import pyplot as plt
data = TraceWin.dst('part_dtl1.dst')
m = np.where(data['E']>3.5)
data.subplot(221, 'x', 'xp', mask=m)
data.subplot(222, 'y', 'yp', mask=m)
data.subplot(223, 'phi', 'E', mask=m)
data.subplot(224, 'x', 'y', mask=m)
from matplotlib.colors import LogNorm
import matplotlib.pyplot as plt
import numpy as np
units = {
"x": "mm",
"y": "mm",
"xp": "mrad",
"yp": "mrad",
"E": "MeV",
"phi": "deg",
}
dy = dy[mask]
if x in ["x", "y", "xp", "yp"]:
dx *= 1e3
if y in ["x", "y", "xp", "yp"]:
dy *= 1e3
if x in ["phi"]:
dx -= np.average(dx)
dx *= 180 / np.pi
if y in ["phi"]:
dy -= np.average(dy)
dy *= 180 / np.pi
if x in ["E"] and max(dx) < 0.1:
dx *= 1e3
units["E"] = "keV"
if y in ["E"] and max(dy) < 0.1:
dy *= 1e3
units["E"] = "keV"
plt.hist2d(dx, dy, bins=nb, norm=LogNorm())
plt.title("{} [{}] - {} [{}]".format(x, units[x], y, units[y]))
hist, bin_edges = np.histogram(dx, bins=nb)
b = bin_edges[:-1] + 0.5 * (bin_edges[1] - bin_edges[0])
if ylim:
miny = ylim[0]
maxy = ylim[1]
else:
miny = min(dy)
maxy = max(dy)
"k",
lw=1.5,
drawstyle="steps",
)
hist, bin_edges = np.histogram(dy, bins=nb)
b = bin_edges[:-1] + 0.0 * (bin_edges[1] - bin_edges[0])
if xlim:
minx = xlim[0]
maxx = xlim[1]
else:
minx = min(dx)
maxx = max(dx)
b,
"k",
lw=1.5,
drawstyle="steps",
else:
# plot a simple 1D histogram..
plt.hist(dx, bins=nb)
if xlim:
plt.xlim(xlim)
if ylim:
plt.ylim(ylim)
Simple class to read in a
TraceWin plot file
Class afterwards hold the following
dictionary items:
- Ne (number of locations)
- Np (number of particles)
- Ib [A] (beam current)
- freq [MHz]
each plt[i], where i is element number, holds:
- Zgen [cm] (location)
- phase0 [deg] (ref phase)
- wgen [MeV] (ref energy)
- yp [array, rad]
- phi [array, rad]
- E [array, MeV]
- l [array] (is lost)
plt=ess.TraceWin.plt('calc/dtl1.plt')
for i in [97,98]:
data=plt[i]
if data:
print(data['x'])
Ryoichi Miyamoto
committed
def __init__(self, filename, flag_remove_loss=True):
Ryoichi Miyamoto
committed
# option to remove lost particles, default True
self.flag_remove_loss = flag_remove_loss
self._columns = ["x", "xp", "y", "yp", "phi", "E", "l"]
# read in the file..
self._readBinaryFile()
def _readBinaryFile(self):
# Thanks Emma!
import numpy
Header_type = numpy.dtype(
[
("dummy12", numpy.int16),
("Ne", numpy.int32),
("Np", numpy.int32),
("Ib", numpy.float64),
("freq", numpy.float64),
("mc2", numpy.float64),
]
)
# shortnaming
i8 = numpy.int8
i32 = numpy.int32
f64 = numpy.float64
SubHeader_type = numpy.dtype([("dummy12", i8), ("Nelp", i32), ("Zgen", f64), ("phase0", f64), ("wgen", f64)])
Header = numpy.fromfile(fin, dtype=Header_type, count=1)
self.Np = Header["Np"][0]
self.Ne = Header["Ne"][0]
self.Ib = Header["Ib"][0]
self.freq = Header["freq"][0]
self.mc2 = Header["mc2"][0]
self._data = []
self.Nelp = []
i = 0
while i < self.Ne:
SubHeader = numpy.fromfile(fin, dtype=SubHeader_type, count=1)
# unfinished files need this fix (simulation still running)
Table = numpy.fromfile(fin, dtype=numpy.float32, count=self.Np * 7)
Table = Table.reshape(self.Np, 7)
data = {}
for key in ["Zgen", "phase0", "wgen"]:
data[key] = SubHeader[key][0]
for j in range(7):
c = self._columns[j]
data[c] = Table[:, j]
# convert x,y from cm to m
if c in ["x", "y"]:
data[c] *= 1e-2
self._data.append(data)
def __getitem__(self, key):
if key in self.Nelp:
Ryoichi Miyamoto
committed
# if want to EXCLUDE lost particles (default):
Ryoichi Miyamoto
committed
lost_mask = self._data[i]["l"] == 0
for para in self._data[i]:
if isinstance(self._data[i][para], numpy.ndarray):
ret[para] = self._data[i][para][lost_mask]
else:
ret[para] = self._data[i][para]
# if want to KEEP lost particles:
else:
for para in self._data[i]:
ret[para] = self._data[i][para]
Ryoichi Miyamoto
committed
Generates self.s which holds
the position of each element
in metres
self.s.append(self[i]["Zgen"] / 100.0)
self.s = numpy.array(self.s)
Calculates averages of 6D coordinates at each
element, such that e.g.
self.avg["x"] gives average X at each location.
self.avg = dict(x=[], xp=[], y=[], yp=[], E=[], phi=[])
for v in vals:
self.avg[v].append(numpy.average(data[v]))
at each position, based on
for i, j in zip(self.Nelp, range(len(self.Nelp))):
Eavg = self.avg["E"][j]
self.beta.append(numpy.sqrt(1.0 - 1.0 / self.gamma[-1] ** 2))
self.gamma = numpy.array(self.gamma)
self.beta = numpy.array(self.beta)
Calculates min/max values of beam coordinates
in percentile, pmin is lower and pmax upper.
Units: cm
self.min = dict(x=[], xp=[], y=[], yp=[], E=[])
self.max = dict(x=[], xp=[], y=[], yp=[], E=[])
self.min[v].append(numpy.percentile(data[v], pmin))
self.max[v].append(numpy.percentile(data[v], pmax))
self.min[v] = numpy.array(self.min[v])
self.max[v] = numpy.array(self.max[v])
Calculates the sigma matrix
Creates self.sigma such that self.sigma[i,j]
returns the sigma matrix for value i,j.
The numbering is:
0: x
1: xp
2: y
3: yp
4: E
5: phi
for j in range(len(self.Nelp)):
self.sigma.append([[numpy.mean((data[n] - self.avg[n][j]) * (data[m] - self.avg[m][j])) for n in vals] for m in vals])
Yngve Levinsen
committed
self.sigma = numpy.array(self.sigma).transpose()
for j in range(len(vals)):
Yngve Levinsen
committed
self.std[v] = numpy.sqrt(self.sigma[j, j])
Calculates emittance, beta, alfa, gamma
for each plane, x-xp, y-yp, and E-phi
for j in range(len(self.Nelp)):
Yngve Levinsen
committed
self.twiss_eps.append([numpy.sqrt(numpy.linalg.det(self.sigma[i : i + 2, i : i + 2, j])) for i in (0, 2, 4)])
self.twiss_eps = numpy.array(self.twiss_eps).transpose()
# Calculate normalized emittance:
# TODO: this is NOT correct normalization for longitudinal
for i in range(3):
Yngve Levinsen
committed
self.twiss_eps_normed[i] *= self.gamma * self.beta
# Calculate beta:
# This is a factor 10 different from what TraceWin plots
Yngve Levinsen
committed
self.twiss_beta = [[self.sigma[i, i, j] / self.twiss_eps[i // 2, j] for i in (0, 2, 4)] for j in range(len(self.Nelp))]
self.twiss_beta = numpy.array(self.twiss_beta).transpose()
Yngve Levinsen
committed
self.twiss_alpha = [[-self.sigma[i, i + 1, j] / self.twiss_eps[i // 2, j] for i in (0, 2, 4)] for j in range(len(self.Nelp))]
self.twiss_alpha = numpy.array(self.twiss_alpha).transpose()
Returns the dst corresponding to the given index
dset = self[index]
_dst = dst()
_dst.freq = self.freq
_dst.Ib = self.Ib * 1000
_dst._data = numpy.array([dset["x"], dset["xp"], dset["y"], dset["yp"], dset["phi"], dset["E"]]).transpose()
Saves the dst at the specified index to file
Returns the same dst object.
Simple class to read a TraceWin density file
into a pythonized object
Class afterwards hold the same items as
found in the TraceWin documentation:
z, nelp, ib, Np, Xouv, Youv, dXouv, ..
def __init__(self, filename, envelope=None):
if filename.split("/")[-1].split(".")[0] == "Density_Env":
# first we simply count how many elements we have:
self.Xouv = numpy.zeros(counter)
self.Youv = numpy.zeros(counter)
if self.version >= 9:
self.dXouv = numpy.zeros(counter)
self.dYouv = numpy.zeros(counter)
self.moy = numpy.zeros((counter, 7))
self.moy2 = numpy.zeros((counter, 7))
self._max = numpy.zeros((counter, 7))
self._min = numpy.zeros((counter, 7))
if self.version >= 11:
self.phaseF = numpy.zeros((counter))
self.phaseG = numpy.zeros((counter))
if self.version >= 10:
self.maxR = numpy.zeros((counter, 7))
self.minR = numpy.zeros((counter, 7))
if self.version >= 5:
self.rms_size = numpy.zeros((counter, 7))
self.rms_size2 = numpy.zeros((counter, 7))
if self.version >= 6:
self.min_pos_moy = numpy.zeros((counter, 7))
self.max_pos_moy = numpy.zeros((counter, 7))
if self.version >= 7:
self.rms_emit = numpy.zeros((counter, 3))
self.rms_emit2 = numpy.zeros((counter, 3))
if self.version >= 8:
self.energy_accept = numpy.zeros(counter)
self.phase_ouv_pos = numpy.zeros(counter)
self.phase_ouv_neg = numpy.zeros(counter)
self.lost = numpy.zeros((counter, self.Nrun))
self.powlost = numpy.zeros((counter, self.Nrun))
self.lost2 = numpy.zeros(counter)
self.Minlost = numpy.zeros(counter)
self.Maxlost = numpy.zeros(counter)
self.powlost2 = numpy.zeros(counter)
self.Minpowlost = numpy.zeros(counter)
self.Maxpowlost = numpy.zeros(counter)
self.i += 1
if sys.flags.debug and self.i % 100 == 0:
def _getHeader(self):
import numpy
# header..
version = numpy.fromfile(self.fin, dtype=numpy.int16, count=1)[0]
year = numpy.fromfile(self.fin, dtype=numpy.int16, count=1)[0]
# in case we did not read all data, this will detect our mistake:
shift = 0
while year != 2011 or version not in [8, 9, 10, 11, 12]:
shift += 1
version = year
year = numpy.fromfile(self.fin, dtype=numpy.int16, count=1)[0]
raise ValueError(f"ERROR, shifted {shift + 2} bytes")
self.vlong = numpy.fromfile(self.fin, dtype=numpy.int16, count=1)[0]
self.Nrun = numpy.fromfile(self.fin, dtype=numpy.int32, count=1)[0]
def _skipAndCount(self):
import numpy
self._getHeader()
if self.version == 8:
numpy.fromfile(self.fin, dtype=numpy.int16, count=292 // 2)
elif self.version == 9:
numpy.fromfile(self.fin, dtype=numpy.int16, count=300 // 2)
elif self.version == 10:
numpy.fromfile(self.fin, dtype=numpy.int16, count=356 // 2)
elif self.version == 11:
numpy.fromfile(self.fin, dtype=numpy.int16, count=364 // 2)
raise TypeError(f"It is not possible to read {self.filename}")
elif self.Nrun > 1:
# WARN not 100% sure if this is correct..
if self.version <= 9:
numpy.fromfile(self.fin, dtype=numpy.int16, count=((5588 + self.Nrun * 12) // 2))
numpy.fromfile(self.fin, dtype=numpy.int16, count=((20796 + self.Nrun * 12) // 2))
raise TypeError(f"It is not possible to read {self.filename}")
numpy.fromfile(self.fin, dtype=numpy.int16, count=8344 // 2)
elif self.version == 9:
numpy.fromfile(self.fin, dtype=numpy.int16, count=12352 // 2)
elif self.version == 10:
numpy.fromfile(self.fin, dtype=numpy.int16, count=12408 // 2)
elif self.version == 11:
numpy.fromfile(self.fin, dtype=numpy.int16, count=12416 // 2)
raise TypeError(f"It is not possible to read {self.filename}")
def _get_7dim_array(array):
return dict(
x=array[0],
y=array[1],
phase=array[2],
energy=array[3],
r=array[4],
z=array[5],
dpp=array[6],
)
def _getFullContent(self):
import numpy
# (though only if we are SURE about content!)
ver, year, vlong = numpy.fromfile(self.fin, dtype=numpy.int16, count=3)
if year != self.year:
raise ValueError(f"year doesn't match {self.year} vs {year} in {self.filename}")
if ver != self.version:
raise ValueError(f"version doesn't match {self.version} vs {ver} in {self.filename}")
numpy.fromfile(self.fin, dtype=numpy.int32, count=1)[0]
self.nelp[self.i] = numpy.fromfile(self.fin, dtype=numpy.int32, count=1)[0]
self.ib[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
self.z[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
self.Xouv[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
self.Youv[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
step = numpy.fromfile(self.fin, dtype=numpy.int32, count=1)[0]
n = 7 # x [m], y[m], Phase [deg], Energy [MeV], R[m], Z[m], dp/p
self.moy[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)[:]
self.moy2[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)[:]
self._max[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)[:]
self._min[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)[:]
if self.version >= 11:
self.phaseF[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
self.phaseG[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
self.maxR[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)[:]
self.minR[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)[:]
self.rms_size[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)
self.rms_size2[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)
self.min_pos_moy[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)
self.max_pos_moy[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=n)
self.rms_emit[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=3)[:]
self.rms_emit2[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=3)[:]
self.energy_accept[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)
self.phase_ouv_pos[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)
self.phase_ouv_neg[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)
self.Np[self.i] = numpy.fromfile(self.fin, dtype=numpy.int64, count=1)[0]
for i in range(self.Nrun):
self.lost[self.i, i] = numpy.fromfile(self.fin, dtype=numpy.int64, count=1)[0]
self.powlost[self.i, i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
self.lost2[self.i] = numpy.fromfile(self.fin, dtype=numpy.int64, count=1)[0]
self.Minlost[self.i] = numpy.fromfile(self.fin, dtype=numpy.int64, count=1)[0]
self.Maxlost[self.i] = numpy.fromfile(self.fin, dtype=numpy.int64, count=1)[0]
self.powlost2[self.i] = numpy.fromfile(self.fin, dtype=numpy.float64, count=1)[0]
self.Minpowlost[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
self.Maxpowlost[self.i] = numpy.fromfile(self.fin, dtype=numpy.float32, count=1)[0]
numpy.fromfile(self.fin, dtype=numpy.uint64, count=n * step)
numpy.fromfile(self.fin, dtype=numpy.uint32, count=n * step)
numpy.fromfile(self.fin, dtype=numpy.uint32, count=3 * step)
returns the average of the parameter
weighted by how many Nruns in self and other object
This allows for different lengths of the two arrays..
mine = getattr(self, param)
new = getattr(other, param)
if len(mine) > len(new):
ret[: len(new)] = (mine[: len(new)] * self.Nrun + new * other.Nrun) / (self.Nrun + other.Nrun)
ret[: len(mine)] = (mine * self.Nrun + new[: len(mine)] * other.Nrun) / (self.Nrun + other.Nrun)
ret = (mine * self.Nrun + new * other.Nrun) / (self.Nrun + other.Nrun)
returns the sum of the parameter
This allows for different lengths of the two arrays..
mine = getattr(self, param)
new = getattr(other, param)
if len(mine) > len(new):
returns the concatenation of the two matrices
This allows for different lengths of the two arrays/matrices..
mine = getattr(self, param)
new = getattr(other, param)
ret = numpy.zeros((max([len(mine), len(new)]), len(mine[0]) + len(new[0])))
ret[: len(mine), : len(mine[0])] = mine
ret[: len(new), len(mine[0]) :] = new
returns the function applied on the parameter
This allows for different lengths of the two arrays..
mine = getattr(self, param)
new = getattr(other, param)
if len(mine) > len(new):
ret[: len(new)] = function(mine[: len(new)], new)
ret[: len(mine)] = function(mine, new[: len(mine)])
raise TypeError("You tried to merge a non-list")
# for now we only allow objects with same version..
for o in objects:
if self.version != o.version:
raise ValueError("Cannot merge files with differing version")
# merge info..
for o in objects:
raise ValueError("Sorry, not implemented yet. Complain to Yngve")
# this looks strange to me, but it is what TraceWin does..
self.moy = self._sum_merge(o, "moy")
self.moy2 = self._sum_merge(o, "moy")
self._max = self._fun_merge(o, numpy.maximum, "_max")
self._min = self._fun_merge(o, numpy.minimum, "_min")
# this looks strange to me, but it is what TraceWin does..
self.rms_size = self._sum_merge(o, "rms_size")
self.rms_size2 = self._sum_merge(o, "rms_size2")
self.max_pos_moy = self._fun_merge(o, numpy.maximum, "max_pos_moy")
self.min_pos_moy = self._fun_merge(o, numpy.minimum, "min_pos_moy")
# this looks strange to me, but it is what TraceWin does..
self.rms_emit = self._sum_merge(o, "rms_emit")
self.rms_emit2 = self._sum_merge(o, "rms_emit2")
# Warning: TraceWin does NOT merge these data in any way
self.energy_accept = self._avg_merge(o, "energy_accept")
self.phase_ouv_pos = self._avg_merge(o, "phase_ouv_pos")
self.phase_ouv_neg = self._avg_merge(o, "phase_ouv_neg")
# Note, we don't get into the problem of differing table sizes
# particles are lost, because we have written zeroes for
# the rest of the tables
self.lost = self._concatenate_merge(o, "lost")
self.powlost = self._concatenate_merge(o, "powlost")
self.lost2 = self._sum_merge(o, "lost2")
self.powlost2 = self._sum_merge(o, "powlost2")
self.Minlost = self._fun_merge(o, numpy.minimum, "Minlost")
self.Maxlost = self._fun_merge(o, numpy.maximum, "Maxlost")
self.Minpowlost = self._fun_merge(o, numpy.minimum, "Minpowlost")
self.Maxpowlost = self._fun_merge(o, numpy.maximum, "Maxpowlost")
# Note: We are ignoring tab/tabp data...
# merge final info (make sure to do this last!)