16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 | class AstraGenerator(CommandWrapper):
"""
Class to run Astra's particle generator
The file for Astra is written in:
.output_file
"""
COMMAND = "$GENERATOR_BIN"
def __init__(self, input_file=None, **kwargs):
super().__init__(input_file=input_file, **kwargs)
# Save init
if isinstance(input_file, str):
self.original_input_file = self.input_file
else:
self.original_input_file = 'generator.in'
# These will be filled
self.input = {}
self.output = {}
# Call configure
if self.input_file:
self.load_input(self.input_file)
self.configure()
def load_input(self, input_filepath, absolute_paths=True, **kwargs):
# Allow dict
if isinstance(input_filepath, dict):
self.input = input_filepath
return
super().load_input(input_filepath, **kwargs)
if absolute_paths:
parsers.fix_input_paths(self.input, root=self.original_path)
self.input = self.input['input']
def input_parser(self, path):
return parsers.parse_astra_input_file(path)
def configure(self):
# Check that binary exists
self.command = lumetools.full_path(self.command)
self.setup_workdir(self.path)
self.input_file = os.path.join(self.path, self.original_input_file)
# We will change directories to work in the local directory
self.input['fname'] = 'generator.part'
self.configured = True
def run(self):
"""
Runs Generator
Note: do not use os.chdir
"""
self.write_input_file()
runscript = self.get_run_script()
try:
res = tools.execute2(runscript, timeout=None, cwd=self.path)
self.log = res['log']
self.vprint(self.log)
# This is the file that should be written
if os.path.exists(self.output_file):
self.finished = True
else:
print(f'AstraGenerator.output_file {self.output_file} does not exist.')
print(f'Here is what the current working dir looks like: {os.listdir(self.path)}')
self.load_output()
except Exception as ex:
print('AstraGenerator.run exception:', traceback.format_exc())
self.error = True
finally:
pass
@property
def output_file(self):
return os.path.join(self.path, self.input['fname'])
def load_output(self):
pfile = self.output_file
data = parse_astra_phase_file(pfile)
# Clock time is used when at cathode
data['t'] = data['t_clock']
P = ParticleGroup(data=data)
self.output['particles'] = P
def write_input_file(self):
writers.write_namelists({'input': self.input}, self.input_file)
# Methods from CommandWrapper not implemented here
def archive(self, h5=None):
"""
Archive all data to an h5 handle or filename.
If no file is given, a file based on the fingerprint will be created.
"""
if not h5:
h5 = 'astra_generator_' + self.fingerprint() + '.h5'
if isinstance(h5, str):
h5 = os.path.expandvars(h5)
g = h5py.File(h5, 'w')
self.vprint(f'Archiving to file {h5}')
else:
# store directly in the given h5 handle
g = h5
# Write basic attributes
archive.astra_init(g)
# All input
g2 = g.create_group('generator_input')
for k, v in self.input.items():
g2.attrs[k] = v
return h5
def load_archive(self, h5, configure=True):
"""
Loads input and output from archived h5 file.
"""
if isinstance(h5, str):
h5 = os.path.expandvars(h5)
g = h5py.File(h5, 'r')
else:
g = h5
attrs = dict(g['generator_input'].attrs)
self.input = {}
for k, v, in attrs.items():
self.input[k] = tools.native_type(v)
if configure:
self.configure()
def plot(self, y=..., x=None, xlim=None, ylim=None, ylim2=None, y2=..., nice=True, include_layout=True, include_labels=False, include_particles=True, include_legend=True, return_figure=False):
return super().plot(y=y, x=x, xlim=xlim, ylim=ylim, ylim2=ylim2, y2=y2, nice=nice, include_layout=include_layout, include_labels=include_labels, include_particles=include_particles, include_legend=include_legend, return_figure=return_figure)
def write_input(self, input_filename):
return super().write_input(input_filename)
|