# -- Lib import numpy as np from typing import List from ess import lib_tw # -- Input path_name_lat = "lattice.dat" path_name_lat_tmp = "lattice_tmp.dat" path_name_res = "tracewin.out" # -- How to use the LATTICE class lat = lib_tw.LATTICE(path_name_lat) # -- Pick up the line # (I'm calling "idx") and elem # (I'm calling "idx_elem") of SPK field maps idx_spk = [] idx_elem_spk = [] for lat_i in lat.lst: if lat_i.typ == "FIELD_MAP": if lat_i.name_fmap == "Spoke_W_coupler": idx_spk.append(lat_i.idx) idx_elem_spk.append(lat_i.idx_elem) # # This does the same thing... # for i in range(len(lat.lst)): # if lat.lst[i].typ=='FIELD_MAP': # if lat.lst[i].name_fmap=='Spoke_W_coupler': # idx_spk.append(i) # idx_elem_spk.append(lat.lst[i].idx_elem) print("idx (line #) of SPK field maps (starting from 0):") print(idx_spk, "\n") print("idx_elem (elem # of) SPK field maps (starting from 0):") print(idx_elem_spk, "\n") # -- Give a name "SPK2" to the 2nd SPK and pick up its idx and idx_elem from the name lat.lst[idx_spk[1]].name = "SPK2" for lat_i in lat.lst: if lat_i.name == "SPK2": idx_spk2 = lat_i.idx idx_elem_spk2 = lat_i.idx_elem print("idx (line #) of 2nd SPK:", idx_spk2) print("idx_elem (elem #) of 2nd SPK:", idx_elem_spk2, "\n") # -- Introduce an error command err_comm_name = "MY_ERROR_COMMAND" err_comm_typ = ( "ERROR_CAV_NCPL_STAT" # I know it's redundant but we also need this for now... ) err_comm_para = [0, 0, 0, 0, 0, 0, 0, 0, 0] # This also works: err_comm_para=[0]*100 err_comm = lib_tw.ERROR_CAV_NCPL_STAT(err_comm_name, err_comm_typ, err_comm_para) print("This is how it looks in the TraceWin syntax:") print(err_comm.get_tw(), "\n") # -- Change parameters of the error command err_comm.typ_dist = "0" # To make sure the dist type is constant err_comm.E = 2e-2 # To introduce 2% field error err_comm.phs_rf = 3.0 * np.pi / 180.0 # To introduce 3 deg phase error print("This is how the command looks after the changes:") print(err_comm.get_tw(), "\n") # -- Insert the command in front of the 2nd SPK with the index "idx_spk2" print("This is how the lattice looks around the 2nd SPK looks originally:") for i in range(idx_spk2 - 2, idx_spk2 + 3): print(lat.lst[i].get_tw()) print("") lat.lst.insert(idx_spk[1], err_comm) # This does the insertion lat.update_idx() # "magic word" to update idx and idx_elem # Update idx and idx_elem of SPK2 for lat_i in lat.lst: if lat_i.name == "SPK2": idx_spk2 = lat_i.idx idx_elem_spk2 = lat_i.idx_elem print("idx (line #) of 2nd SPK after the insertion:", idx_spk2) print("idx_elem (elem #) of 2nd SPK after the insertion:", idx_elem_spk2, "\n") print("Note idx is +1 but no change for idx_elem (since we inserted a command).\n") print("This is how the lattice looks around the 2nd SPK after the change:") for i in range(idx_spk2 - 2, idx_spk2 + 3): print(lat.lst[i].get_tw()) print("") # -- Save the new lattice to a file lat.get_tw(path_name_lat_tmp) # -- exit() # -- Steerers # Indices of steerers (can be done with names??) and append max to THIN_STEERING idx_st_x: List[int] = [] idx_st_y: List[int] = [] for i in range(len(lat.lst)): # STEERER if lat.lst[i].typ == "STEERER": for j in range(i)[::-1]: if lat.lst[j].typ == "ADJUST_STEERER": idx_st_x.append(i) idx_st_y.append(i) break if lat.lst[j].typ == "ADJUST_STEERER_BY": idx_st_x.append(i) break if lat.lst[j].typ == "ADJUST_STEERER_BX": idx_st_y.append(i) break if lat.lst[j].idx_elem != lat.lst[i].idx_elem: break # THIN_STEERING (assuming the same for x and y for the dual plane ones) if lat.lst[i].typ == "THIN_STEERING": for j in range(i)[::-1]: if lat.lst[j].typ == "ADJUST": lat.lst[i].max = lat.lst[j].max if lat.lst[j].var == 1: idx_st_y.append(i) if lat.lst[j].var == 2: idx_st_x.append(i) if lat.lst[j].idx_elem != lat.lst[i].idx_elem - 1: break # Indices of physical steerer locations for STEERER (later to extract L and s) idx_st_x_elem = [] for i in idx_st_x: if lat.lst[i].typ == "THIN_STEERING": idx_st_x_elem.append(i) if lat.lst[i].typ == "STEERER": for j in range(i + 1, len(lat.lst)): if lat.lst[j].idx_elem == lat.lst[i].idx_elem + 1: idx_st_x_elem.append(j) break idx_st_y_elem = [] for i in idx_st_y: if lat.lst[i].typ == "THIN_STEERING": idx_st_y_elem.append(i) if lat.lst[i].typ == "STEERER": for j in range(i + 1, len(lat.lst)): if lat.lst[j].idx_elem == lat.lst[i].idx_elem + 1: idx_st_y_elem.append(j) break # Check max is defined for i in set(idx_st_x + idx_st_y): if lat.lst[i].max <= 0: print( "Max B/BL not defined for elem #", str(lat.lst[i].idx_elem + 1), ". Exiting...", ) exit() # -- BPMs idx_bpm_x = [i for i in range(len(lat.lst)) if lat.lst[i].typ == "DIAG_POSITION"] idx_bpm_y = idx_bpm_x[:] ########################################################################### # Remove redundant BPMs to avoid over-constraint (not necessarily generic...) k = 0 while k < len(idx_bpm_x): if len([i for i in idx_st_x if i < idx_bpm_x[k]]) < k + 1: del idx_bpm_x[k] else: k += 1 k = 0 while k < len(idx_bpm_y): if len([i for i in idx_st_y if i < idx_bpm_y[k]]) < k + 1: del idx_bpm_y[k] else: k += 1 ########################################################################### idx_elem_bpm_x = [lat.lst[i].idx_elem for i in idx_bpm_x] idx_elem_bpm_y = [lat.lst[i].idx_elem for i in idx_bpm_y] ########################################################################### # To do 1-to-1 with the "twist" pattern # idx_elem_bpm[7],idx_elem_bpm[8]=idx_elem_bpm[8],idx_elem_bpm[7] ########################################################################### # YL comment: code below is broken (step_x, step_y, Nrun and more not defined) # Commented out so the flake8 test doesn't fail # # -- Define indices for blocks of R-matrix from "step" (may be too much...) # block_x = [] # for k in range(len(step_x) + 1): # if int(sum(step_x[:k])) >= len(idx_bpm_x): # block_x.append(len(idx_bpm_x)) # break # else: # block_x.append(int(sum(step_x[:k]))) # if block_x[-1] < len(idx_bpm_x): # block.append(len(idx_bpm_x)) # block_y = [] # for k in range(len(step_y) + 1): # if int(sum(step_y[:k])) >= len(idx_bpm_y): # block_y.append(len(idx_bpm_y)) # break # else: # block_y.append(int(sum(step_y[:k]))) # if block_y[-1] < len(idx_bpm_y): # block.append(len(idx_bpm_y)) # # -- R matrix # Rx = Pool(Ncpu).map(Rx_column, range(len(idx_st_x))) # Ry = Pool(Ncpu).map(Ry_column, range(len(idx_st_y))) # Rx = np.array(Rx).transpose() # Ry = np.array(Ry).transpose() # if len(Rx) > len(Rx[0]): # print("[# of BPMs] > [# of steerers] for x, exiting...") # exit() # if len(Ry) > len(Ry[0]): # print("[# of BPMs] > [# of steerers] for y, exiting...") # exit() # if len(Rx) < len(Rx[0]): # print("[# of BPMs] < [# of steerers] for x, exiting...") # exit() # if len(Ry) < len(Ry[0]): # print("[# of BPMs] < [# of steerers] for y, exiting...") # exit() # # -- Main part # data = Pool(Ncpu).map(job, range(Nrun)) # data = np.array(data).transpose() # s = data[0][0] # x = np.array(data[1].tolist()).transpose() # array not applied to the 3rd level ?? # y = np.array(data[2].tolist()).transpose() # array not applied to the 3rd level ?? # BLy = np.array(data[3].tolist()).transpose() # array not applied to the 3rd level ?? # BLx = np.array(data[4].tolist()).transpose() # array not applied to the 3rd level ?? # # -- Writing # # x # with open("x.out", "w") as file: # # Header # file.write("s ") # for n in range(Nrun): # file.write("x%03d " % n) # file.write("\n") # # Data # for k in range(len(s)): # file.write("%.4f " % s[k]) # for n in range(Nrun): # file.write("%.5f " % x[k][n]) # file.write("\n") # # y # with open("y.out", "w") as file: # # Header # file.write("s ") # for n in range(Nrun): # file.write("y%03d " % n) # file.write("\n") # # Data # for k in range(len(s)): # file.write("%.4f " % s[k]) # for n in range(Nrun): # file.write("%.5f " % y[k][n]) # file.write("\n") # # x steering (BLy [Gm]) # with open("st.x.out", "w") as file: # # Header # file.write("## s ") # for n in range(Nrun): # file.write("BLy%03d " % n) # file.write("\n") # # Data # for k in range(len(idx_st_x)): # file.write("%03d " % (k + 1) + "%.4f " % lat.lst[idx_st_x_elem[k]].s) # for n in range(Nrun): # file.write("%.4f " % (1e4 * BLy[k][n])) # file.write("\n") # # y steering (BLx [Gm]) # with open("st.y.out", "w") as file: # # Header # file.write("## s ") # for n in range(Nrun): # file.write("BLx%03d " % n) # file.write("\n") # # Data # for k in range(len(idx_st_y)): # file.write("%03d " % (k + 1) + "%.4f " % lat.lst[idx_st_y_elem[k]].s) # for n in range(Nrun): # file.write("%.4f " % (1e4 * BLx[k][n])) # file.write("\n") # -- Ending # -- Obsolete # def job(n): # ''' # Arbitrary x step correction for the random seed seed[n]. # ''' # # Indices of sub-matrices, e.g., step=[8,7] => block=[8,15] # block=[] # for k in range(len(step)+1): # if int(sum(step[:k]))>=len(idx_elem_bpm): block.append(len(idx_elem_bpm )); break # else : block.append(int(sum(step[:k]))) # if block[-1]<len(idx_elem_bpm): block.append(len(idx_elem_bpm)) # # Define child calc dir # path_cal_n=path_cal+'/tmp_'+str(n) # # Set-up TraceWin # opt_tw=setup_tw(path_cal_n,seed[n]) # lat_n =LATTICE(path_cal_n+'/'+file_name_lat[::-1].partition('/')[0][::-1],[]) # # Loop for steps # for b in range(len(block)-1): # # Borders of the sub-matrix # k0=block[b]; k1=block[b+1] # # Intermediate trajectory # lat_n.get_tw(path_cal_n+'/'+file_name_lat[::-1].partition('/')[0][::-1]) # call(opt_tw,shell=True) # tw=PARTRAN(path_cal_n+'/tracewin.out') # # Temp R matrices, using only the [k0,k1-1] block # Rx_k=zeros((len(Rx),len(Rx[0]))); Rx_k[k0:k1,k0:k1]=Rx[k0:k1,k0:k1] # Can't do this with a list # Ry_k=zeros((len(Ry),len(Ry[0]))); Ry_k[k0:k1,k0:k1]=Ry[k0:k1,k0:k1] # Can't do this with a list # # Correction # st_x_nom=lstsq(Rx_k,[-tw.x[i] for i in idx_elem_bpm])[0] # Can include BPM error here # st_y_nom=lstsq(Ry_k,[-tw.y[i] for i in idx_elem_bpm])[0] # Can include BPM error here # # Apply correction # for k in range(k0,k1): # i=idx_st_x[k]; st_x=st_x_nom[k]*lat.lst[i].max # if lat_n.lst[i].typ=='STEERER' : lat_n.lst[i].By =st_x # if lat_n.lst[i].typ=='THIN_STEERING': lat_n.lst[i].BLy=st_x # i=idx_st_y[k]; st_y=st_y_nom[k]*lat.lst[i].max # if lat_n.lst[i].typ=='STEERER' : lat_n.lst[i].Bx =st_y # if lat_n.lst[i].typ=='THIN_STEERING': lat_n.lst[i].BLx=st_y # # Trajectory after correction # lat_n.get_tw(path_cal_n+'/'+file_name_lat[::-1].partition('/')[0][::-1]) # call(opt_tw,shell=True) # tw=PARTRAN(path_cal_n+'/tracewin.out') # # Save steering strengths # BLy=[]; BLx=[] # for k in range(len(idx_st_x)): # i=idx_st_x[k]; j=idx_st_x_elem[k] # if lat_n.lst[i].typ=='STEERER' : BLy.append(lat_n.lst[i].By *lat_n.lst[j].L) # if lat_n.lst[i].typ=='THIN_STEERING': BLy.append(lat_n.lst[i].BLy ) # for k in range(len(idx_st_y)): # i=idx_st_y[k]; j=idx_st_y_elem[k] # if lat_n.lst[i].typ=='STEERER' : BLx.append(lat_n.lst[i].Bx *lat_n.lst[j].L) # if lat_n.lst[i].typ=='THIN_STEERING': BLx.append(lat_n.lst[i].BLx ) # # Clean # call('rm -rf '+path_cal_n,shell=True) # print 'job #'+str(n)+' done.' # return [tw.s,tw.x,tw.y,BLy,BLx]