Skip to content

Astra

Astra

Bases: CommandWrapper

Astra simulation object. Essential methods: .init(...) .configure() .run()

Input deck is held in .input Output data is parsed into .output .load_particles() will load particle data into .output['particles'][...]

The Astra binary file can be set on init. If it doesn't exist, configure will check the $ASTRA_BIN environmental variable.

Source code in astra/astra.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
class Astra(CommandWrapper):

    """ 
    Astra simulation object. Essential methods:
    .__init__(...)
    .configure()
    .run()

    Input deck is held in .input
    Output data is parsed into .output
    .load_particles() will load particle data into .output['particles'][...]

    The Astra binary file can be set on init. If it doesn't exist, configure will check the
        $ASTRA_BIN
    environmental variable.


    """
    MPI_SUPPORTED = False
    COMMAND = "$ASTRA_BIN"
    INPUT_PARSER = parsers.parse_astra_input_file

    def __init__(self, 
                 input_file=None,
                 *,                 
                 group=None,
                 **kwargs
                 ):
        super().__init__(input_file=input_file, **kwargs)
        # Save init
        self.original_input_file = self._input_file

        # These will be set
        self.log = []
        self.output = {'stats': {}, 'particles': {}, 'run_info': {}}
        self.group = {}  # Control Groups
        self.fieldmap = {}  # Fieldmaps

        # Call configure
        if self.input_file:
            self.load_input(self.input_file)
            self.configure()

            # Add groups, if any.
            if group:
                for k, v in group.items():
                    self.add_group(k, **v)
        else:            
            self.vprint('Using default input: 1 m drift lattice')
            self.original_input_file = 'astra.in'
            self.input = deepcopy(DEFAULT_INPUT)
            self.configure()

    def add_group(self, name, **kwargs):
        """
        Add a control group. See control.py

        Parameters
        ----------
        name : str
            The group name
        """
        assert name not in self.input, f'{name} not allowed to be overwritten by group.'
        if name in self.group:
            self.vprint(f'Warning: group {name} already exists, overwriting.')

        g = ControlGroup(**kwargs)
        g.link(self.input)
        self.group[name] = g

        return self.group[name]

    def clean_output(self):
        run_number = parsers.astra_run_extension(self.input['newrun']['run'])
        outfiles = parsers.find_astra_output_files(self.input_file, run_number)
        for f in outfiles:
            os.remove(f)

    def clean_particles(self):
        run_number = parsers.astra_run_extension(self.input['newrun']['run'])
        phase_files = parsers.find_phase_files(self.input_file, run_number)
        files = [x[0] for x in phase_files]  # This is sorted by approximate z
        for f in files:
            os.remove(f)

    # Convenience routines
    @property
    def particles(self):
        return self.output['particles']

    def stat(self, key):
        return self.output['stats'][key]

    def particle_stat(self, key, alive_only=True):
        """
        Compute a statistic from the particles.

        Alive particles have status == 1. By default, statistics will only be computed on these.

        n_dead will override the alive_only flag,
        and return the number of particles with status < -6 (Astra convention)
        """

        if key == 'n_dead':
            return np.array([len(np.where(P.status < -6)[0]) for P in self.particles])

        if key == 'n_alive':
            return np.array([len(np.where(P.status > -6)[0]) for P in self.particles])

        pstats = []
        for P in self.particles:
            if alive_only and P.n_dead > 0:
                P = P.where(P.status == 1)
            pstats.append(P[key])
        return np.array(pstats)


    def configure(self):
        self.setup_workdir(self._workdir)
        self.command = lumetools.full_path(self.command)
        self.vprint("Configured to run in:", self.path)
        self.input_file = os.path.join(self.path, self.original_input_file)
        self.configured = True    


 #   def configure(self):
 #       self.configure_astra(workdir=self.path)
#
 #   def configure_astra(self, input_filepath=None, workdir=None):
#
 #  #     if input_filepath:
 #  #         self.load_input(input_filepath)
#
 #       # Check that binary exists
 #       #self.command = lumetools.full_path(self.command)
#
 #       self.setup_workdir(self._workdir)
 #       #self.input_file = os.path.join(self.path, self.original_input_file)
 #       self.configured = True

    def load_fieldmaps(self, search_paths=[]):
        """
        Loads fieldmaps into Astra.fieldmap as a dict.

        Optionally, a list of paths can be included that will search for these. The default will search self.path.
        """

        # Do not consider files if fieldmaps have been loaded.
        if self.fieldmap:
            strip_path = False
        else:
            strip_path = True

        if not search_paths:
            search_paths = [self.path]

        self.fieldmap = load_fieldmaps(self.input, fieldmap_dict=self.fieldmap, search_paths=search_paths,
                                       verbose=self.verbose, strip_path=strip_path)

    def load_initial_particles(self, h5):
        """Loads a openPMD-beamphysics particle h5 handle or file"""
        P = ParticleGroup(h5=h5)
        self.initial_particles = P

    def input_parser(self, path):
        return parsers.parse_astra_input_file(path)

    def load_input(self, input_filepath, absolute_paths=True, **kwargs):
        super().load_input(input_filepath, **kwargs)
        if absolute_paths:
            parsers.fix_input_paths(self.input, root=self.original_path)

    def load_output(self, include_particles=True):
        """
        Loads Astra output files into .output

        .output is a dict with dicts:
            .stats
            .run_info
            .other

        and if include_particles,
            .particles = list of ParticleGroup objects

        """
        run_number = parsers.astra_run_extension(self.input['newrun']['run'])
        outfiles = parsers.find_astra_output_files(self.input_file, run_number)

        # assert len(outfiles)>0, 'No output files found'

        stats = self.output['stats'] = {}

        for f in outfiles:
            type = parsers.astra_output_type(f)
            d = parsers.parse_astra_output_file(f)
            if type in ['Cemit', 'Xemit', 'Yemit', 'Zemit']:
                stats.update(d)
            elif type in ['LandF']:
                self.output['other'] = d
            else:
                raise ValueError(f'Unknown output type: {type}')

        # Check that the lengths of all arrays are the same
        nlist = {len(stats[k]) for k in stats}

        assert len(nlist) == 1, f'Stat keys do not all have the same length: {[len(stats[k]) for k in stats]}'

        if include_particles:
            self.load_particles()

    def load_particles(self, end_only=False):
        # Clear existing particles
        self.output['particles'] = []

        # Sort files by approximate z
        run_number = parsers.astra_run_extension(self.input['newrun']['run'])
        phase_files = parsers.find_phase_files(self.input_file, run_number)
        files = [x[0] for x in phase_files]  # This is sorted by approximate z
        zapprox = [x[1] for x in phase_files]

        if end_only:
            files = files[-1:]
        if self.verbose:
            print('loading ' + str(len(files)) + ' particle files')
            print(zapprox)
        for f in files:
            pdat = parse_astra_phase_file(f)
            P = ParticleGroup(data=pdat)
            self.output['particles'].append(P)

    def run(self):
        self.run_astra()

    def run_astra(self, verbose=False, parse_output=True, timeout=None):
        """
        Runs Astra

        Changes directory, so does not work with threads.
        """
        if not self.configured:
            print('not configured to run')
            return

        run_info = self.output['run_info'] = {}

        t1 = time()
        run_info['start_time'] = t1

        # Write all input
        self.write_input()

        runscript = self.get_run_script()
        tools.make_executable(os.path.join(self.path, 'run'))
        run_info['run_script'] = ' '.join(runscript)

        if self.timeout:
            res = tools.execute2(runscript, timeout=timeout, cwd=self.path)
            log = res['log']
            self.error = res['error']
            run_info['why_error'] = res['why_error']
            # Log file must have this to have finished properly
            if log.find('finished simulation') == -1:
                raise ValueError("Couldn't find finished simulation")

        else:
            # Interactive output, for Jupyter
            log = []
            for path in tools.execute(runscript, cwd=self.path):
                self.vprint(path, end="")
                log.append(path)

        self.log = log

        if parse_output:
            self.load_output()

        run_info['run_time'] = time() - t1

        self.finished = True

        self.vprint(run_info)

    def units(self, key):
        if key in parsers.OutputUnits:
            return parsers.OutputUnits[key]
        else:
            return 'unknown unit'

    def load_archive(self, h5=None, configure=False):
        """
        Loads input and output from archived h5 file.

        See: Astra.archive
        """
        if isinstance(h5, str):
            h5 = os.path.expandvars(h5)
            g = h5py.File(h5, 'r')

            glist = archive.find_astra_archives(g)
            n = len(glist)
            if n == 0:
                # legacy: try top level
                message = 'legacy'
            elif n == 1:
                gname = glist[0]
                message = f'group {gname} from'
                g = g[gname]
            else:
                raise ValueError(f'Multiple archives found in file {h5}: {glist}')

            self.vprint(f'Reading {message} archive file {h5}')
        else:
            g = h5

        self.input = archive.read_input_h5(g['input'])
        self.output = archive.read_output_h5(g['output'])
        if 'initial_particles' in g:
            self.initial_particles = ParticleGroup(h5=g['initial_particles'])

        if 'fieldmap' in g:
            self.fieldmap = archive.read_fieldmap_h5(g['fieldmap'])

        if 'control_groups' in g:
            self.group = archive.read_control_groups_h5(g['control_groups'], verbose=self.verbose)

        self.vprint('Loaded from archive. Note: Must reconfigure to run again.')
        self.configured = False

        # Re-link groups
        # TODO: cleaner logic
        for _, cg in self.group.items():
            cg.link(self.input)

        if configure:
            self.configure()

    def archive(self, h5=None):
        """
        Archive all data to an h5 handle or filename.

        If no file is given, a file based on the fingerprint will be created.

        """
        if not h5:
            h5 = 'astra_' + self.fingerprint() + '.h5'

        if isinstance(h5, str):
            h5 = os.path.expandvars(h5)
            g = h5py.File(h5, 'w')
            self.vprint(f'Archiving to file {h5}')
        else:
            # store directly in the given h5 handle
            g = h5

        # Write basic attributes
        archive.astra_init(g)

        # Initial particles
        if self.initial_particles:
            self.initial_particles.write(g, name='initial_particles')

        # Fieldmaps
        if self.fieldmap:
            archive.write_fieldmap_h5(g, self.fieldmap, name='fieldmap')

        # All input
        archive.write_input_h5(g, self.input)

        # All output
        archive.write_output_h5(g, self.output)

        # Control groups
        if self.group:
            archive.write_control_groups_h5(g, self.group, name='control_groups')

        return h5

    def write_fieldmaps(self, path=None):
        """
        Writes any loaded fieldmaps to path
        """
        if path is None:
            path = self.path         

        if self.fieldmap:
            write_fieldmaps(self.fieldmap, path)
            self.vprint(f'{len(self.fieldmap)} fieldmaps written to {path}')

    def write_input(self, input_filename=None, path=None, make_symlinks=True):
        """
        Writes all input. If fieldmaps have been loaded, these will also be written.
        """

        if path is None:
            path = self.path        

        if self.initial_particles:
            fname = self.write_initial_particles(path=path)
            self.input['newrun']['distribution'] = fname

        self.write_fieldmaps(path=path)

        self.write_input_file(path=path, make_symlinks=make_symlinks)

    def write_input_file(self, path=None, make_symlinks=True):
        if path is None:
            path = self.path
            input_file = self.input_file
        else:
            input_file = os.path.join(path, 'astra.in')

        writers.write_namelists(self.input, input_file, make_symlinks=make_symlinks, verbose=self.verbose)

    def write_initial_particles(self, fname=None, path=None):
        if path is None:
            path = self.path  

        fname = fname or os.path.join(path, 'astra.particles')
        # 
        if len(self.initial_particles) == 1:
            probe = True
        else:
            probe = False
        self.initial_particles.write_astra(fname, probe=probe)
        self.vprint(f'Initial particles written to {fname}')
        return fname

    def plot(self, y=['sigma_x', 'sigma_y'], x='mean_z', xlim=None, ylim=None, ylim2=None, y2=[],
             nice=True,
             include_layout=True,
             include_labels=False,
             include_particles=True,
             include_legend=True,
             return_figure=False,
             **kwargs):
        """
        Plots stat output multiple keys.

        If a list of ykeys2 is given, these will be put on the right hand axis. This can also be given as a single key.

        Logical switches:
            nice: a nice SI prefix and scaling will be used to make the numbers reasonably sized. Default: True

            include_legend: The plot will include the legend.  Default: True

            include_particles: Plot the particle statistics as dots. Default: True

            include_layout: the layout plot will be displayed at the bottom.  Default: True

            include_labels: the layout will include element labels.  Default: False

            return_figure: return the figure object for further manipulation. Default: False

        If there is no output to plot, the fieldmaps will be plotted with .plot_fieldmaps

        """

        # Just plot fieldmaps if there are no stats
        if not self.output['stats']:
            return plot_fieldmaps(self, xlim=xlim, **kwargs)

        return plot_stats_with_layout(self, ykeys=y, ykeys2=y2,
                                      xkey=x, xlim=xlim, ylim=ylim, ylim2=ylim2,
                                      nice=nice,
                                      include_layout=include_layout,
                                      include_labels=include_labels,
                                      include_particles=include_particles,
                                      include_legend=include_legend,
                                      return_figure=return_figure,
                                      **kwargs)

    def plot_fieldmaps(self, **kwargs):
        return plot_fieldmaps(self, **kwargs)

    def __getitem__(self, key):
        """
        Convenience syntax to get a header or element attribute.

        Special syntax:

        end_X
            will return the final item in a stat array X
            Example:
            'end_norm_emit_x'

        particles:N
            will return a ParticleGroup N from the .particles list
            Example:
                'particles:-1'
                returns the readback of the final particles
        particles:N:Y
            ParticleGroup N's property Y
            Example:
                'particles:-1:sigma_x'
            returns sigma_x from the end of the particles list.


        See: __setitem__
        """

        # Object attributes
        if hasattr(self, key):
            return getattr(self, key)

            # Send back top level input (namelist) or group object.
        # Do not add these to __setitem__. The user shouldn't be allowed to change them as a whole,
        #   because it will break all the links.
        if key in self.group:
            return self.group[key]
        if key in self.input:
            return self.input[key]

        if key.startswith('end_'):
            key2 = key[len('end_'):]
            assert key2 in self.output['stats'], f'{key} does not have valid output stat: {key2}'
            return self.output['stats'][key2][-1]

        if key.startswith('particles:'):
            key2 = key[len('particles:'):]
            x = key2.split(':')
            if len(x) == 1:
                return self.particles[int(x[0])]
            else:
                return self.particles[int(x[0])][x[1]]

        # key isn't an ele or group, should have property s

        x = key.split(':')
        assert len(x) == 2, f'{x} was not found in group or input dict, so should have : '
        name, attrib = x[0], x[1]

        # Look in input and group
        if name in self.input:
            return self.input[name][attrib]
        elif name in self.group:
            return self.group[name][attrib]

    def __setitem__(self, key, item):
        """
        Convenience syntax to set namelist or group attribute.
        attribute_string should be 'header:key' or 'ele_name:key'

        Examples of attribute_string: 'header:Np', 'SOL1:solenoid_field_scale'

        Settable attributes can also be given:

        ['stop'] = 1.2345 will set Impact.stop = 1.2345

        """

        # Set attributes
        if hasattr(self, key):
            setattr(self, key, item)
            return

        # Must be in input or group
        name, attrib = key.split(':')
        if name in self.input:
            self.input[name][attrib] = item
        elif name in self.group:
            self.group[name][attrib] = item
        else:
            raise ValueError(f'{name} does not exist in eles or groups of the Impact object.')



    # Tracking
    #---------
    def track(self, particles, z=None):
        """
        Track a ParticleGroup. An optional stopping z can be given.

        If successful, returns a ParticleGroup with the final particles.

        Otherwise, returns None

        """

        self.initial_particles = particles
        if z is not None:
            self['output:zstop'] = z

        # Assure phase space output is turned on
        nr = self.input['newrun']
        if 'zphase' not in nr:
            nr['zphase'] = 1
        if nr['zphase'] < 1:
            nr['zphase'] = 1
        # Turn particle output on.
        nr['phases'] = True

        self.run()

        if 'particles' in self.output:
            if len(self.output['particles']) == 0:
                return None

            final_particles = self.output['particles'][-1]

            # Special case to remove probe particles
            if len(self.initial_particles) == 1:
                final_particles = final_particles[-1]        
            return final_particles

        else:
            return None       

    def track1(self,
                  x0=0,
                  px0=0,
                  y0=0,
                  py0=0,
                  z0=0,
                  pz0=1e-15,
                  t0=0,
                  weight=1,
                  status=1,
                  z=None, # final z
                  species='electron'):
        """
        Tracks a single particle with starting coordinates:
            x0, y0, z0 in meters
            px0, py0, pz0 in eV/c
            t0 in seconds

        to a position 'z' in meters

        If successful, returns a ParticleGroup with the final particle.

        Otherwise, returns None

        """
        p0 = single_particle(x=x0, px=px0, y=y0, py=py0, z=z0, pz=pz0, t=t0, weight=weight, status=status, species=species)
        return self.track(p0, z=z)



    @classmethod
    @functools.wraps(astra_from_tao) 
    def from_tao(cls, tao):
        return astra_from_tao(tao, cls=cls)    

__getitem__(key)

Convenience syntax to get a header or element attribute.

Special syntax:

end_X will return the final item in a stat array X Example: 'end_norm_emit_x'

particles:N will return a ParticleGroup N from the .particles list Example: 'particles:-1' returns the readback of the final particles particles:N:Y ParticleGroup N's property Y Example: 'particles:-1:sigma_x' returns sigma_x from the end of the particles list.

See: setitem

Source code in astra/astra.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
def __getitem__(self, key):
    """
    Convenience syntax to get a header or element attribute.

    Special syntax:

    end_X
        will return the final item in a stat array X
        Example:
        'end_norm_emit_x'

    particles:N
        will return a ParticleGroup N from the .particles list
        Example:
            'particles:-1'
            returns the readback of the final particles
    particles:N:Y
        ParticleGroup N's property Y
        Example:
            'particles:-1:sigma_x'
        returns sigma_x from the end of the particles list.


    See: __setitem__
    """

    # Object attributes
    if hasattr(self, key):
        return getattr(self, key)

        # Send back top level input (namelist) or group object.
    # Do not add these to __setitem__. The user shouldn't be allowed to change them as a whole,
    #   because it will break all the links.
    if key in self.group:
        return self.group[key]
    if key in self.input:
        return self.input[key]

    if key.startswith('end_'):
        key2 = key[len('end_'):]
        assert key2 in self.output['stats'], f'{key} does not have valid output stat: {key2}'
        return self.output['stats'][key2][-1]

    if key.startswith('particles:'):
        key2 = key[len('particles:'):]
        x = key2.split(':')
        if len(x) == 1:
            return self.particles[int(x[0])]
        else:
            return self.particles[int(x[0])][x[1]]

    # key isn't an ele or group, should have property s

    x = key.split(':')
    assert len(x) == 2, f'{x} was not found in group or input dict, so should have : '
    name, attrib = x[0], x[1]

    # Look in input and group
    if name in self.input:
        return self.input[name][attrib]
    elif name in self.group:
        return self.group[name][attrib]

__setitem__(key, item)

Convenience syntax to set namelist or group attribute. attribute_string should be 'header:key' or 'ele_name:key'

Examples of attribute_string: 'header:Np', 'SOL1:solenoid_field_scale'

Settable attributes can also be given:

['stop'] = 1.2345 will set Impact.stop = 1.2345

Source code in astra/astra.py
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def __setitem__(self, key, item):
    """
    Convenience syntax to set namelist or group attribute.
    attribute_string should be 'header:key' or 'ele_name:key'

    Examples of attribute_string: 'header:Np', 'SOL1:solenoid_field_scale'

    Settable attributes can also be given:

    ['stop'] = 1.2345 will set Impact.stop = 1.2345

    """

    # Set attributes
    if hasattr(self, key):
        setattr(self, key, item)
        return

    # Must be in input or group
    name, attrib = key.split(':')
    if name in self.input:
        self.input[name][attrib] = item
    elif name in self.group:
        self.group[name][attrib] = item
    else:
        raise ValueError(f'{name} does not exist in eles or groups of the Impact object.')

add_group(name, **kwargs)

Add a control group. See control.py

Parameters

name : str The group name

Source code in astra/astra.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def add_group(self, name, **kwargs):
    """
    Add a control group. See control.py

    Parameters
    ----------
    name : str
        The group name
    """
    assert name not in self.input, f'{name} not allowed to be overwritten by group.'
    if name in self.group:
        self.vprint(f'Warning: group {name} already exists, overwriting.')

    g = ControlGroup(**kwargs)
    g.link(self.input)
    self.group[name] = g

    return self.group[name]

archive(h5=None)

Archive all data to an h5 handle or filename.

If no file is given, a file based on the fingerprint will be created.

Source code in astra/astra.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def archive(self, h5=None):
    """
    Archive all data to an h5 handle or filename.

    If no file is given, a file based on the fingerprint will be created.

    """
    if not h5:
        h5 = 'astra_' + self.fingerprint() + '.h5'

    if isinstance(h5, str):
        h5 = os.path.expandvars(h5)
        g = h5py.File(h5, 'w')
        self.vprint(f'Archiving to file {h5}')
    else:
        # store directly in the given h5 handle
        g = h5

    # Write basic attributes
    archive.astra_init(g)

    # Initial particles
    if self.initial_particles:
        self.initial_particles.write(g, name='initial_particles')

    # Fieldmaps
    if self.fieldmap:
        archive.write_fieldmap_h5(g, self.fieldmap, name='fieldmap')

    # All input
    archive.write_input_h5(g, self.input)

    # All output
    archive.write_output_h5(g, self.output)

    # Control groups
    if self.group:
        archive.write_control_groups_h5(g, self.group, name='control_groups')

    return h5

load_archive(h5=None, configure=False)

Loads input and output from archived h5 file.

See: Astra.archive

Source code in astra/astra.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def load_archive(self, h5=None, configure=False):
    """
    Loads input and output from archived h5 file.

    See: Astra.archive
    """
    if isinstance(h5, str):
        h5 = os.path.expandvars(h5)
        g = h5py.File(h5, 'r')

        glist = archive.find_astra_archives(g)
        n = len(glist)
        if n == 0:
            # legacy: try top level
            message = 'legacy'
        elif n == 1:
            gname = glist[0]
            message = f'group {gname} from'
            g = g[gname]
        else:
            raise ValueError(f'Multiple archives found in file {h5}: {glist}')

        self.vprint(f'Reading {message} archive file {h5}')
    else:
        g = h5

    self.input = archive.read_input_h5(g['input'])
    self.output = archive.read_output_h5(g['output'])
    if 'initial_particles' in g:
        self.initial_particles = ParticleGroup(h5=g['initial_particles'])

    if 'fieldmap' in g:
        self.fieldmap = archive.read_fieldmap_h5(g['fieldmap'])

    if 'control_groups' in g:
        self.group = archive.read_control_groups_h5(g['control_groups'], verbose=self.verbose)

    self.vprint('Loaded from archive. Note: Must reconfigure to run again.')
    self.configured = False

    # Re-link groups
    # TODO: cleaner logic
    for _, cg in self.group.items():
        cg.link(self.input)

    if configure:
        self.configure()

load_fieldmaps(search_paths=[])

Loads fieldmaps into Astra.fieldmap as a dict.

Optionally, a list of paths can be included that will search for these. The default will search self.path.

Source code in astra/astra.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def load_fieldmaps(self, search_paths=[]):
    """
    Loads fieldmaps into Astra.fieldmap as a dict.

    Optionally, a list of paths can be included that will search for these. The default will search self.path.
    """

    # Do not consider files if fieldmaps have been loaded.
    if self.fieldmap:
        strip_path = False
    else:
        strip_path = True

    if not search_paths:
        search_paths = [self.path]

    self.fieldmap = load_fieldmaps(self.input, fieldmap_dict=self.fieldmap, search_paths=search_paths,
                                   verbose=self.verbose, strip_path=strip_path)

load_initial_particles(h5)

Loads a openPMD-beamphysics particle h5 handle or file

Source code in astra/astra.py
198
199
200
201
def load_initial_particles(self, h5):
    """Loads a openPMD-beamphysics particle h5 handle or file"""
    P = ParticleGroup(h5=h5)
    self.initial_particles = P

load_output(include_particles=True)

Loads Astra output files into .output

.output is a dict with dicts: .stats .run_info .other

and if include_particles, .particles = list of ParticleGroup objects

Source code in astra/astra.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def load_output(self, include_particles=True):
    """
    Loads Astra output files into .output

    .output is a dict with dicts:
        .stats
        .run_info
        .other

    and if include_particles,
        .particles = list of ParticleGroup objects

    """
    run_number = parsers.astra_run_extension(self.input['newrun']['run'])
    outfiles = parsers.find_astra_output_files(self.input_file, run_number)

    # assert len(outfiles)>0, 'No output files found'

    stats = self.output['stats'] = {}

    for f in outfiles:
        type = parsers.astra_output_type(f)
        d = parsers.parse_astra_output_file(f)
        if type in ['Cemit', 'Xemit', 'Yemit', 'Zemit']:
            stats.update(d)
        elif type in ['LandF']:
            self.output['other'] = d
        else:
            raise ValueError(f'Unknown output type: {type}')

    # Check that the lengths of all arrays are the same
    nlist = {len(stats[k]) for k in stats}

    assert len(nlist) == 1, f'Stat keys do not all have the same length: {[len(stats[k]) for k in stats]}'

    if include_particles:
        self.load_particles()

particle_stat(key, alive_only=True)

Compute a statistic from the particles.

Alive particles have status == 1. By default, statistics will only be computed on these.

n_dead will override the alive_only flag, and return the number of particles with status < -6 (Astra convention)

Source code in astra/astra.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def particle_stat(self, key, alive_only=True):
    """
    Compute a statistic from the particles.

    Alive particles have status == 1. By default, statistics will only be computed on these.

    n_dead will override the alive_only flag,
    and return the number of particles with status < -6 (Astra convention)
    """

    if key == 'n_dead':
        return np.array([len(np.where(P.status < -6)[0]) for P in self.particles])

    if key == 'n_alive':
        return np.array([len(np.where(P.status > -6)[0]) for P in self.particles])

    pstats = []
    for P in self.particles:
        if alive_only and P.n_dead > 0:
            P = P.where(P.status == 1)
        pstats.append(P[key])
    return np.array(pstats)

plot(y=['sigma_x', 'sigma_y'], x='mean_z', xlim=None, ylim=None, ylim2=None, y2=[], nice=True, include_layout=True, include_labels=False, include_particles=True, include_legend=True, return_figure=False, **kwargs)

Plots stat output multiple keys.

If a list of ykeys2 is given, these will be put on the right hand axis. This can also be given as a single key.

Logical switches

nice: a nice SI prefix and scaling will be used to make the numbers reasonably sized. Default: True

include_legend: The plot will include the legend. Default: True

include_particles: Plot the particle statistics as dots. Default: True

include_layout: the layout plot will be displayed at the bottom. Default: True

include_labels: the layout will include element labels. Default: False

return_figure: return the figure object for further manipulation. Default: False

If there is no output to plot, the fieldmaps will be plotted with .plot_fieldmaps

Source code in astra/astra.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def plot(self, y=['sigma_x', 'sigma_y'], x='mean_z', xlim=None, ylim=None, ylim2=None, y2=[],
         nice=True,
         include_layout=True,
         include_labels=False,
         include_particles=True,
         include_legend=True,
         return_figure=False,
         **kwargs):
    """
    Plots stat output multiple keys.

    If a list of ykeys2 is given, these will be put on the right hand axis. This can also be given as a single key.

    Logical switches:
        nice: a nice SI prefix and scaling will be used to make the numbers reasonably sized. Default: True

        include_legend: The plot will include the legend.  Default: True

        include_particles: Plot the particle statistics as dots. Default: True

        include_layout: the layout plot will be displayed at the bottom.  Default: True

        include_labels: the layout will include element labels.  Default: False

        return_figure: return the figure object for further manipulation. Default: False

    If there is no output to plot, the fieldmaps will be plotted with .plot_fieldmaps

    """

    # Just plot fieldmaps if there are no stats
    if not self.output['stats']:
        return plot_fieldmaps(self, xlim=xlim, **kwargs)

    return plot_stats_with_layout(self, ykeys=y, ykeys2=y2,
                                  xkey=x, xlim=xlim, ylim=ylim, ylim2=ylim2,
                                  nice=nice,
                                  include_layout=include_layout,
                                  include_labels=include_labels,
                                  include_particles=include_particles,
                                  include_legend=include_legend,
                                  return_figure=return_figure,
                                  **kwargs)

run_astra(verbose=False, parse_output=True, timeout=None)

Runs Astra

Changes directory, so does not work with threads.

Source code in astra/astra.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def run_astra(self, verbose=False, parse_output=True, timeout=None):
    """
    Runs Astra

    Changes directory, so does not work with threads.
    """
    if not self.configured:
        print('not configured to run')
        return

    run_info = self.output['run_info'] = {}

    t1 = time()
    run_info['start_time'] = t1

    # Write all input
    self.write_input()

    runscript = self.get_run_script()
    tools.make_executable(os.path.join(self.path, 'run'))
    run_info['run_script'] = ' '.join(runscript)

    if self.timeout:
        res = tools.execute2(runscript, timeout=timeout, cwd=self.path)
        log = res['log']
        self.error = res['error']
        run_info['why_error'] = res['why_error']
        # Log file must have this to have finished properly
        if log.find('finished simulation') == -1:
            raise ValueError("Couldn't find finished simulation")

    else:
        # Interactive output, for Jupyter
        log = []
        for path in tools.execute(runscript, cwd=self.path):
            self.vprint(path, end="")
            log.append(path)

    self.log = log

    if parse_output:
        self.load_output()

    run_info['run_time'] = time() - t1

    self.finished = True

    self.vprint(run_info)

track(particles, z=None)

Track a ParticleGroup. An optional stopping z can be given.

If successful, returns a ParticleGroup with the final particles.

Otherwise, returns None

Source code in astra/astra.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
def track(self, particles, z=None):
    """
    Track a ParticleGroup. An optional stopping z can be given.

    If successful, returns a ParticleGroup with the final particles.

    Otherwise, returns None

    """

    self.initial_particles = particles
    if z is not None:
        self['output:zstop'] = z

    # Assure phase space output is turned on
    nr = self.input['newrun']
    if 'zphase' not in nr:
        nr['zphase'] = 1
    if nr['zphase'] < 1:
        nr['zphase'] = 1
    # Turn particle output on.
    nr['phases'] = True

    self.run()

    if 'particles' in self.output:
        if len(self.output['particles']) == 0:
            return None

        final_particles = self.output['particles'][-1]

        # Special case to remove probe particles
        if len(self.initial_particles) == 1:
            final_particles = final_particles[-1]        
        return final_particles

    else:
        return None       

track1(x0=0, px0=0, y0=0, py0=0, z0=0, pz0=1e-15, t0=0, weight=1, status=1, z=None, species='electron')

Tracks a single particle with starting coordinates

x0, y0, z0 in meters px0, py0, pz0 in eV/c t0 in seconds

to a position 'z' in meters

If successful, returns a ParticleGroup with the final particle.

Otherwise, returns None

Source code in astra/astra.py
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
def track1(self,
              x0=0,
              px0=0,
              y0=0,
              py0=0,
              z0=0,
              pz0=1e-15,
              t0=0,
              weight=1,
              status=1,
              z=None, # final z
              species='electron'):
    """
    Tracks a single particle with starting coordinates:
        x0, y0, z0 in meters
        px0, py0, pz0 in eV/c
        t0 in seconds

    to a position 'z' in meters

    If successful, returns a ParticleGroup with the final particle.

    Otherwise, returns None

    """
    p0 = single_particle(x=x0, px=px0, y=y0, py=py0, z=z0, pz=pz0, t=t0, weight=weight, status=status, species=species)
    return self.track(p0, z=z)

write_fieldmaps(path=None)

Writes any loaded fieldmaps to path

Source code in astra/astra.py
416
417
418
419
420
421
422
423
424
425
def write_fieldmaps(self, path=None):
    """
    Writes any loaded fieldmaps to path
    """
    if path is None:
        path = self.path         

    if self.fieldmap:
        write_fieldmaps(self.fieldmap, path)
        self.vprint(f'{len(self.fieldmap)} fieldmaps written to {path}')

write_input(input_filename=None, path=None, make_symlinks=True)

Writes all input. If fieldmaps have been loaded, these will also be written.

Source code in astra/astra.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def write_input(self, input_filename=None, path=None, make_symlinks=True):
    """
    Writes all input. If fieldmaps have been loaded, these will also be written.
    """

    if path is None:
        path = self.path        

    if self.initial_particles:
        fname = self.write_initial_particles(path=path)
        self.input['newrun']['distribution'] = fname

    self.write_fieldmaps(path=path)

    self.write_input_file(path=path, make_symlinks=make_symlinks)

recommended_spacecharge_mesh(n_particles)

! -------------------------------------------------------- ! Suggested Nrad, Nlong_in settings from: ! A. Bartnik and C. Gulliford (Cornell University) ! ! Nrad = 35, Nlong_in = 75 !28K ! Nrad = 29, Nlong_in = 63 !20K ! Nrad = 20, Nlong_in = 43 !10K ! Nrad = 13, Nlong_in = 28 !4K ! Nrad = 10, Nlong_in = 20 !2K ! Nrad = 8, Nlong_in = 16 !1K ! ! Nrad ~ round(3.3(n_particles/1000)^(2/3) + 5) ! Nlong_in ~ round(9.2(n_particles/1000)^(0.603) + 6.5) ! !

Source code in astra/astra.py
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
def recommended_spacecharge_mesh(n_particles):
    """
    ! --------------------------------------------------------
    ! Suggested Nrad, Nlong_in settings from:
    !    A. Bartnik and C. Gulliford (Cornell University)
    !
    ! Nrad = 35,    Nlong_in = 75  !28K
    ! Nrad = 29,    Nlong_in = 63  !20K
    ! Nrad = 20,    Nlong_in = 43  !10K
    ! Nrad = 13,    Nlong_in = 28  !4K
    ! Nrad = 10,    Nlong_in = 20  !2K
    ! Nrad = 8,     Nlong_in = 16  !1K
    !
    ! Nrad ~ round(3.3*(n_particles/1000)^(2/3) + 5)
    ! Nlong_in ~ round(9.2*(n_particles/1000)^(0.603) + 6.5)
    !
    ! 
    """
    if n_particles < 1000:
        # Set a minimum
        nrad = 8
        nlong_in = 16
    else:
        # Prefactors were recalculated from above note. 
        nrad = round(3.3e-2 * n_particles ** (2 / 3) + 5)
        nlong_in = round(0.143 * n_particles ** (0.603) + 6.5)
    return {'nrad': nrad, 'nlong_in': nlong_in}

run_astra(settings=None, astra_input_file=None, workdir=None, command='$ASTRA_BIN', timeout=2500, verbose=False)

Run Astra.

settings: dict with keys that can appear in an Astra input file.
Source code in astra/astra.py
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
def run_astra(settings=None,
              astra_input_file=None,
              workdir=None,
              command='$ASTRA_BIN',
              timeout=2500,
              verbose=False):
    """
    Run Astra. 

        settings: dict with keys that can appear in an Astra input file. 
    """
    if verbose:
        print('run_astra')

        # Make astra object
    A = Astra(command=command, input_file=astra_input_file, workdir=workdir)

    A.timeout = timeout
    A.verbose = verbose

    A.input['newrun']['l_rm_back'] = True  # Remove backwards particles

    # Set inputs
    if settings:
        set_astra(A, {}, settings, verbose=verbose)

    # Run
    A.run()

    return A

run_astra_with_generator(settings=None, astra_input_file=None, generator_input_file=None, workdir=None, command='$ASTRA_BIN', command_generator='$GENERATOR_BIN', timeout=2500, verbose=False, auto_set_spacecharge_mesh=True)

Run Astra with particles generated by Astra's generator.

settings: dict with keys that can appear in an Astra or Generator input file.
Source code in astra/astra.py
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
def run_astra_with_generator(settings=None,
                             astra_input_file=None,
                             generator_input_file=None,
                             workdir=None,
                             command='$ASTRA_BIN',
                             command_generator='$GENERATOR_BIN',
                             timeout=2500, verbose=False,
                             auto_set_spacecharge_mesh=True):
    """
    Run Astra with particles generated by Astra's generator. 

        settings: dict with keys that can appear in an Astra or Generator input file. 
    """

    assert astra_input_file, 'No astra input file'

    # Call simpler evaluation if there is no generator:
    if not generator_input_file:
        return run_astra(settings=settings,
                         astra_input_file=astra_input_file,
                         workdir=workdir,
                         command=command,
                         timeout=timeout,
                         verbose=verbose)

    if verbose:
        print('run_astra_with_generator')

        # Make astra and generator objects
    A = Astra(command=command, input_file=astra_input_file, workdir=workdir)
    A.timeout = timeout
    A.verbose = verbose
    G = AstraGenerator(command=command_generator, input_file=generator_input_file, workdir=workdir)
    G.verbose = verbose

    A.input['newrun']['l_rm_back'] = True  # Remove backwards particles

    # Set inputs
    if settings:
        set_astra(A, G.input, settings, verbose=verbose)

    # Attach generator input. This is non-standard. 
    A.generator_input = G.input        

    if auto_set_spacecharge_mesh:
        n_particles = G.input['ipart']
        sc_settings = recommended_spacecharge_mesh(n_particles)
        A.input['charge'].update(sc_settings)
        if verbose:
            print('set spacecharge mesh for n_particles:', n_particles, 'to', sc_settings)

            # Run Generator
    G.run()
    A.initial_particles = G.output['particles']
    A.run()
    if verbose:
        print('run_astra_with_generator finished')

    return A

set_astra(astra_object, generator_input, settings, verbose=False)

Searches astra and generator objects for keys in settings, and sets their values to the appropriate input

Source code in astra/astra.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
def set_astra(astra_object, generator_input, settings, verbose=False):
    """
    Searches astra and generator objects for keys in settings, and sets their values to the appropriate input
    """
    astra_input = astra_object.input  # legacy syntax

    for k, v in settings.items():
        found = False

        # Check for direct settable attribute
        if ':' in k:
            astra_object[k] = v
            continue

        for nl in astra_input:
            if k in astra_input[nl]:
                found = True
                if verbose:
                    print(k, 'is in astra', nl)
                astra_input[nl][k] = settings[k]
        if not found:
            if k in generator_input:
                found = True
                generator_input[k] = settings[k]
                if verbose:
                    print(k, 'is in generator')

        if not found and verbose:
            print(k, 'not found')
        assert found