import os
import abc
import dataclasses
import functools
import typing
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import numpy as np
import numpy.typing as npt
import salvus.namespace as sn
from salvus.toolbox.helpers.wavefield_output import (
WavefieldOutput,
wavefield_output_to_xarray,
)
# Configuration settings
SALVUS_FLOW_SITE_NAME = os.environ.get("SITE_NAME", "local")
RANKS_PER_JOB = 4
PROJECT_DIR = "project"# Material properties
# P-wave velocity in m/s
VP_M_S = 5766.0
# S-wave velocity in m/s
VS_M_S = 3430.0
# Material density in kg/m^3
DENSITY_KG_M3 = 1000.0
# Beamforming parameters
# Focus point for spherical beamforming
SPHERICAL_FOCUS_POINT = (0.005, -0.02)
# Beam angle from vertical for plane wave
PLANE_WAVE_ANGLE_DEG = 20.0
# Transducer parameters
# Total number of transducers
NUM_TRANSDUCERS = 200
# Width of the transducer array in meters
TRANSDUCER_WIDTH_M = 0.05
# Calculate focus distance and time
FOCUS_DISTANCE = np.sqrt(
SPHERICAL_FOCUS_POINT[0] ** 2 + SPHERICAL_FOCUS_POINT[1] ** 2
)
# Time for P-wave to reach focus point
FOCUS_TIME_P_S = FOCUS_DISTANCE / VP_M_S
# Time for S-wave to reach focus point at intended focus from center of
# transducer array
ARRIVAL_TIME_S_S = FOCUS_DISTANCE / VS_M_S
# Simulation parameters
# Simulation start time
START_TIME_S = -5e-06
# End slightly after S-wave arrives at the P-wave focus point
END_TIME_S = ARRIVAL_TIME_S_S + 2e-06
# Source center frequency in Hz
CENTER_FREQUENCY_HZ = 2.0e6 # 2 MHz
# Visualization time selection (absolute times in seconds)
# Time 1: Early 1 - just as first sources fire
TIME_VIS_1 = -0.6e-06
# Time 2: Early 2 - just as the last sources fire
TIME_VIS_2 = 0.6e-06
# Time 3: Late - when P-wave focuses at spherical focus point
TIME_VIS_3 = FOCUS_TIME_P_S10 cm wide
(x-direction) and 5 cm deep (y-direction), relevant to high-frequency
ultrasonic simulations.domain = sn.domain.dim2.BoxDomain(x0=-0.05, x1=0.05, y0=-0.05, y1=0.0)
p = sn.Project.from_domain(
path=PROJECT_DIR, domain=domain, load_if_exists=True
)
p.viz.nb.domain()PlaneBeamforming class) and one to
create a focusing waves (FocusPointBeamforming class). These classes
both require get_delay, which we can formalize by creating an abstract
method in the abstract base class._TransducerBeamforming must implement a
get_delay() function, otherwise Python will raise an error (i.e., the
raise NotImplementedError should never actually run).class _TransducerBeamforming(metaclass=abc.ABCMeta):
"""
Base class for beamforming strategies.
"""
@abc.abstractmethod
def get_delay(
self, center_location: npt.NDArray, source_locations: npt.NDArray
) -> npt.NDArray:
"""
Calculate time delays for each source location.
Args:
center_location: Center point of the transducer array.
source_locations: Locations of individual sources.
Returns:
Array of time delays for each source.
"""
raise NotImplementedError
@dataclasses.dataclass
class PlaneBeamforming(_TransducerBeamforming):
"""
Plane wave beamforming to steer the beam at a specific angle.
Args:
angle_degree: The angle between the normal vector of the transducer
plane and the plane wave direction (in degrees).
velocity_m_s: Wave velocity in the medium, in m/s. For elastic media,
this will determine if the P-wave or S-wave is being steered.
"""
angle_degree: float
velocity_m_s: float
def get_delay(
self, center_location: npt.NDArray, source_locations: npt.NDArray
) -> npt.NDArray:
"""
Calculate delays for plane wave propagation at specified angle.
"""
angle_rad = np.deg2rad(self.angle_degree)
wavefront_normal = np.array([np.sin(angle_rad), -np.cos(angle_rad)])
source_deltas = source_locations - center_location
wavefront_distances = np.dot(source_deltas, wavefront_normal)
delays = wavefront_distances / self.velocity_m_s
return delays
@dataclasses.dataclass
class FocusPointBeamforming(_TransducerBeamforming):
"""
Spherical wave beamforming to focus energy at a given point.
Args:
focus_point: The point to focus the wavefield at.
velocity_m_s: Wave velocity in the medium, in m/s. For elastic media,
this will determine if the P-wave or S-wave is being steered.
"""
focus_point: npt.ArrayLike
velocity_m_s: float
def __post_init__(self):
self.focus_point = np.asarray(self.focus_point)
def get_delay(
self, center_location: npt.NDArray, source_locations: npt.NDArray
) -> npt.NDArray:
"""
Calculate delays based on distance from focus point.
"""
dist = np.linalg.norm(source_locations - self.focus_point, axis=1)
center_dist = np.linalg.norm(center_location - self.focus_point)
dist -= center_dist
delay = -dist / self.velocity_m_s
return delayTransducer2D class creates a linear array of point sources arranged
along the x-axis. Each source is assigned a time-delayed version (obtained
from the beamforming classes above) of the source time function to achieve
focusing.VectorPoint2D as the
source, the class needs to create a different source object for every
transducer location. Instead, we will pass a function that acts as a
template for creating these source objects, that only requires the x and y
coordinates of the source.@dataclasses.dataclass
class Transducer2D:
"""
A 2D transducer array implementation.
This class represents a linear array of point sources that can be
configured with different beamforming strategies to focus or steer
acoustic energy in 2D elastic media.
Args:
center_location: Center point of the transducer array.
transducer_spacing_m: Distance between adjacent transducers (pitch).
number_of_sources: Number of point sources in the array.
source_fct: Function to create source objects. Must accept x and y
coordinates and return a Salvus source object.
beamforming: Beamforming strategy to compute time delays.
start_time: Simulation start time in seconds.
end_time: Simulation end time in seconds.
"""
center_location: npt.NDArray[np.float_]
transducer_spacing_m: float
number_of_sources: int
source_fct: typing.Callable[
[float, float], sn.simple_config.source._BaseSource
]
beamforming: _TransducerBeamforming
start_time: float
end_time: float
def __post_init__(self) -> None:
self.center_location = np.asarray(self.center_location)
def get_point_source_locations(self) -> npt.NDArray:
"""
Return locations of point sources along x-axis.
Returns:
Array of shape (number_of_sources, 2) containing (x, y)
coordinates for each point source.
"""
total_extent = self.transducer_spacing_m * (self.number_of_sources - 1)
half_extent = total_extent / 2.0
return np.column_stack(
[
self.center_location[0]
+ np.linspace(
-half_extent, half_extent, self.number_of_sources
),
np.full(self.number_of_sources, self.center_location[1]),
]
)
def get_sources(self) -> list[sn.simple_config.source._BaseSource]:
"""
Create Salvus source objects for each point in the array.
Returns:
List of Salvus source objects positioned at each transducer
location.
"""
return [
self.source_fct(x=x, y=y)
for x, y in self.get_point_source_locations()
]
def get_source_time_functions(
self, source_time_function_template: sn.simple_config.stf._Base
) -> tuple[float, float, list[sn.simple_config.stf.Custom]]:
"""
Generate time-delayed source time functions for beamforming.
Apply beamforming time delays to the template source time function
to create individual time-shifted versions for each transducer.
Args:
source_time_function_template: Template source time function to
be time-delayed for each transducer.
Returns:
Tuple containing (adjusted_start_time, adjusted_end_time,
list_of_delayed_source_time_functions).
"""
delay = self.beamforming.get_delay(
center_location=self.center_location,
source_locations=self.get_point_source_locations(),
)
time, _ = source_time_function_template.get_stf()
def _apply_time_shift(
source_time_function_template: sn.simple_config.stf._Base,
time_shift_in_seconds: float,
) -> sn.simple_config.stf._Base:
"""
Adjust start time by applying a time shift.
Args:
source_time_function_template: Template source time function to
be time-delayed for each transducer.
time_shift_in_seconds: Time shift to shift the start time.
Returns:
A new source time function with the adjusted time interval.
"""
stf = source_time_function_template.copy()
# Adjust start and end times by the time shift.
stf._initial_arguments["time_shift_in_seconds"] = (
time_shift_in_seconds
)
return stf
return (
time[0] - delay.min(),
time[-1] + delay.max(),
[
_apply_time_shift(
source_time_function_template=source_time_function_template,
time_shift_in_seconds=float(d),
)
for d in delay
],
)
def get_event_configuration(
self, source_time_function_template: sn.simple_config.stf._Base
) -> sn.EventConfiguration:
"""
Create event configuration with time-delayed sources.
Generate a complete Salvus event configuration suitable for use
in a simulation, with all sources properly time-delayed according
to the beamforming strategy.
Args:
source_time_function_template: Template source time function
to apply to all transducers.
Returns:
Salvus EventConfiguration object with time-delayed sources.
"""
(
_,
_,
source_time_functions,
) = self.get_source_time_functions(
source_time_function_template=source_time_function_template
)
wsc = sn.WaveformSimulationConfiguration(
start_time_in_seconds=float(self.start_time),
end_time_in_seconds=float(self.end_time),
)
return sn.EventConfiguration(
wavelet=source_time_functions,
waveform_simulation_configuration=wsc,
)functools.partial function allows us to pre-configure the source
creation function. Since Transducer2D needs a function that only takes x
and y coordinates, we use functools.partial to create a function that
already has the force components (fx=0.0, fy=-1.0) set. When the transducer
later calls this function with x and y coordinates, it creates a
VectorPoint2D source with both the position and force components filled in.
This approach keeps the Transducer2D class flexible -- feel free to use
different source types or force directions by changing what you pass to
functools.partial.# Spherical wave beamforming - focuses at a point
transducer_spacing = TRANSDUCER_WIDTH_M / (NUM_TRANSDUCERS - 1)
transducer_spherical = Transducer2D(
center_location=np.array([0.0, 0.0]),
transducer_spacing_m=transducer_spacing,
number_of_sources=NUM_TRANSDUCERS,
source_fct=functools.partial(
sn.simple_config.source.cartesian.VectorPoint2D, fx=0.0, fy=-1.0
),
start_time=START_TIME_S,
end_time=END_TIME_S,
beamforming=FocusPointBeamforming(
focus_point=SPHERICAL_FOCUS_POINT,
velocity_m_s=VP_M_S,
),
)
# Plane wave beamforming - steers at an angle
transducer_plane = Transducer2D(
center_location=np.array([0.0, 0.0]),
transducer_spacing_m=transducer_spacing,
number_of_sources=NUM_TRANSDUCERS,
source_fct=functools.partial(
sn.simple_config.source.cartesian.VectorPoint2D, fx=0.0, fy=-1.0
),
start_time=START_TIME_S,
end_time=END_TIME_S,
beamforming=PlaneBeamforming(
angle_degree=PLANE_WAVE_ANGLE_DEG,
velocity_m_s=VP_M_S,
),
)# Obtain sources from both transducer configurations
sources_spherical = transducer_spherical.get_sources()
sources_plane = transducer_plane.get_sources()
# Store names so we can rerun the notebook without duplicating events
event_spherical_name = (
f"event_spherical_{SPHERICAL_FOCUS_POINT[0]:.5f}_"
+ f"{SPHERICAL_FOCUS_POINT[1]:.5f}"
)
event_plane_name = f"event_plane_{PLANE_WAVE_ANGLE_DEG}"
# Create events for both beamforming strategies
p += sn.Event(
event_name=event_spherical_name, sources=sources_spherical, receivers=None
)
p += sn.Event(
event_name=event_plane_name, sources=sources_plane, receivers=None
)event_configuration part, where we use the
get_event_configuration() method of the transducer classes to create the
appropriate event configuration with time-delayed sources. Additionally, we
add absorbing boundaries to the model to prevent reflections from the
domain edges, to simulate the block as if it was much larger.# Simulation configuration for spherical beamforming
p.add_to_project(
sn.SimulationConfiguration(
name="sim_spherical",
elements_per_wavelength=2,
tensor_order=1,
max_frequency_in_hertz=CENTER_FREQUENCY_HZ,
model_configuration=sn.ModelConfiguration(
background_model=sn.model.background.homogeneous.IsotropicElastic(
vp=VP_M_S,
rho=DENSITY_KG_M3,
vs=VS_M_S,
)
),
event_configuration=transducer_spherical.get_event_configuration(
source_time_function_template=sn.simple_config.stf.Ricker(
center_frequency=CENTER_FREQUENCY_HZ,
)
),
absorbing_boundaries=sn.AbsorbingBoundaryParameters(
reference_velocity=1500.0,
number_of_wavelengths=4.0,
reference_frequency=CENTER_FREQUENCY_HZ,
free_surface=["y1"],
),
),
overwrite=False,
)
# Simulation configuration for plane wave beamforming
p.add_to_project(
sn.SimulationConfiguration(
name="sim_plane",
elements_per_wavelength=2,
tensor_order=1,
max_frequency_in_hertz=CENTER_FREQUENCY_HZ,
model_configuration=sn.ModelConfiguration(
background_model=sn.model.background.homogeneous.IsotropicElastic(
vp=VP_M_S,
rho=DENSITY_KG_M3,
vs=VS_M_S,
)
),
event_configuration=transducer_plane.get_event_configuration(
source_time_function_template=sn.simple_config.stf.Ricker(
center_frequency=CENTER_FREQUENCY_HZ,
)
),
absorbing_boundaries=sn.AbsorbingBoundaryParameters(
reference_velocity=1500.0,
number_of_wavelengths=4.0,
reference_frequency=CENTER_FREQUENCY_HZ,
free_surface=["y1"],
),
),
overwrite=False,
)p.viz.nb.simulation_setup("sim_spherical", [event_spherical_name])[2025-11-12 21:43:14,439] INFO: Creating mesh. Hang on.
<salvus.flow.simple_config.simulation.waveform.Waveform object at 0x7c4b8e143f90>
p.viz.nb.simulation_setup("sim_plane", [event_plane_name])<salvus.flow.simple_config.simulation.waveform.Waveform object at 0x7c4bb6792a90>
# Launch spherical beamforming simulation
p.simulations.launch(
simulation_configuration="sim_spherical",
events=[event_spherical_name],
site_name=SALVUS_FLOW_SITE_NAME,
ranks_per_job=RANKS_PER_JOB,
extra_output_configuration={
"volume_data": {
"fields": ["gradient-of-displacement"],
"sampling_interval_in_time_steps": 10,
}
},
)
# Launch plane wave beamforming simulation
p.simulations.launch(
simulation_configuration="sim_plane",
events=[event_plane_name],
site_name=SALVUS_FLOW_SITE_NAME,
ranks_per_job=RANKS_PER_JOB,
extra_output_configuration={
"volume_data": {
"fields": ["gradient-of-displacement"],
"sampling_interval_in_time_steps": 10,
}
},
)
p.simulations.query(block=True, ping_interval_in_seconds=4)[2025-11-12 21:43:15,993] INFO: Submitting job ... Uploading 1 files... 🚀 Submitted job_2511122143136747_f03e941a0e@local [2025-11-12 21:43:16,495] INFO: Submitting job ... Uploading 1 files... 🚀 Submitted job_2511122143584977_ab385b2090@local
True
# Load wavefield outputs for both simulations
output_dir_spherical = p.simulations.get_simulation_output_directory(
"sim_spherical", event_spherical_name
)
output_dir_plane = p.simulations.get_simulation_output_directory(
"sim_plane", event_plane_name
)
wo_spherical = WavefieldOutput.from_file(
output_dir_spherical / "volume_data_output.h5",
"gradient-of-displacement",
"volume",
)
wo_plane = WavefieldOutput.from_file(
output_dir_plane / "volume_data_output.h5",
"gradient-of-displacement",
"volume",
)
# Extract to regular grid
nx, ny = 200, 100
x0, x1 = domain.bounds.hc["x"]
y0, y1 = domain.bounds.vc["y"]
x = np.linspace(x0, x1, nx)
y = np.linspace(y0, y1, ny)
wavefield_grad_spherical = wavefield_output_to_xarray(wo_spherical, [x, y])
wavefield_grad_plane = wavefield_output_to_xarray(wo_plane, [x, y])
print(f"Spherical wavefield shape: {wavefield_grad_spherical.shape}")
print(f"Plane wavefield shape: {wavefield_grad_plane.shape}")Spherical wavefield shape: (91, 4, 200, 100) Plane wavefield shape: (91, 4, 200, 100)
# Compute P-wave and S-wave components for both simulations
# P-wave component: divergence = du_x/dx + du_y/dy
# Component indices: 0 = du_x/dx, 3 = du_y/dy
p_wave_spherical = wavefield_grad_spherical.isel(
c=0
) + wavefield_grad_spherical.isel(c=3)
p_wave_plane = wavefield_grad_plane.isel(c=0) + wavefield_grad_plane.isel(c=3)
# S-wave component: curl = du_y/dx - du_x/dy
# Component indices: 2 = du_y/dx, 1 = du_x/dy
s_wave_spherical = wavefield_grad_spherical.isel(
c=2
) - wavefield_grad_spherical.isel(c=1)
s_wave_plane = wavefield_grad_plane.isel(c=2) - wavefield_grad_plane.isel(c=1)
print(f"Spherical P-wave shape: {p_wave_spherical.shape}")
print(f"Plane P-wave shape: {p_wave_plane.shape}")Spherical P-wave shape: (91, 200, 100) Plane P-wave shape: (91, 200, 100)
# Select time snapshots to visualize using absolute time values
time_snapshots = [TIME_VIS_1, TIME_VIS_2, TIME_VIS_3]def wavefield_to_rgb(data, cmap_name, vmin, vmax):
"""
Convert wavefield data to RGB image using a colormap.
Args:
data: 2D array of wavefield values
cmap_name: Name of colormap
vmin, vmax: Value range for normalization
Returns:
RGB image array with values in [0, 1]
"""
norm = mcolors.Normalize(vmin=vmin, vmax=vmax, clip=True)
normalized = norm(data)
cmap = plt.get_cmap(cmap_name)
rgba = cmap(normalized)
return rgba[..., :3].copy()
def blend_images_multiply(rgb1, rgb2):
"""
Blend two RGB images using multiply blending mode.
Args:
rgb1: First RGB image
rgb2: Second RGB image
Returns:
Blended RGB image with values in [0, 1]
"""
blended = rgb1 * rgb2
return np.clip(blended, 0, 1)# Colormap for P-waves
p_wave_cmap = "PRGn"
# Colormap for S-waves
s_wave_cmap = "RdBu"
# Create overlay plots using RGB blending
fig, axes = plt.subplots(2, 3, figsize=(12, 5))
fig.suptitle("P-wave and S-wave phasing for both arrays", fontsize=14, y=0.98)
# Change this to adjust color clipping, this adds contrast to low amplitude
# areas, but loses some dynamic range in high amplitude areas
clipping_ratio = 1.0
time_idxs = [
int(np.abs(p_wave_spherical.t.values - time_val_s).argmin())
for time_val_s in time_snapshots
]
selected_p_wave_spherical = p_wave_spherical.isel(t=time_idxs)
selected_s_wave_spherical = s_wave_spherical.isel(t=time_idxs)
selected_p_wave_plane = p_wave_plane.isel(t=time_idxs)
selected_s_wave_plane = s_wave_plane.isel(t=time_idxs)
# Normalize wavefields for consistent color scaling
selected_p_wave_spherical /= np.abs(selected_p_wave_spherical).max()
selected_s_wave_spherical /= np.abs(selected_s_wave_spherical).max()
selected_p_wave_plane /= np.abs(selected_p_wave_plane).max()
selected_s_wave_plane /= np.abs(selected_s_wave_plane).max()
for idx, time_idx in enumerate(time_idxs):
# Select nearest time index for this absolute time
time_val = float(p_wave_spherical.t[time_idx].values) * 1e6
# Spherical beamforming - RGB blend
ax = axes[0, idx]
p_rgb = wavefield_to_rgb(
selected_p_wave_spherical.isel(t=idx).T.values,
p_wave_cmap,
-selected_p_wave_spherical.max() / clipping_ratio,
selected_p_wave_spherical.max() / clipping_ratio,
)
s_rgb = wavefield_to_rgb(
selected_s_wave_spherical.isel(t=idx).T.values,
s_wave_cmap,
-selected_s_wave_spherical.max() / clipping_ratio,
selected_s_wave_spherical.max() / clipping_ratio,
)
blended = blend_images_multiply(p_rgb, s_rgb)
ax.imshow(
blended,
extent=[x[0], x[-1], y[0], y[-1]],
origin="lower",
aspect="auto",
interpolation="bilinear",
)
# Draw circle around focus point
focus_circle = plt.Circle(
SPHERICAL_FOCUS_POINT,
radius=0.006,
fill=False,
edgecolor="black",
linestyle=":",
linewidth=1.5,
alpha=0.3,
label="Focus point" if idx == 0 else None,
)
ax.add_patch(focus_circle)
# Add row title to center column
if idx == 1:
ax.set_title(f"Spherical\nt = {time_val:.2f} μs")
else:
ax.set_title(f"t = {time_val:.2f} μs")
ax.set_aspect("equal")
# Only label leftmost axes
if idx == 0:
ax.set_ylabel("y [m]")
ax.legend(loc="lower left")
else:
ax.set_yticklabels([])
# No x labels on top row
ax.set_xticklabels([])
# Plane wave beamforming - RGB blend
ax = axes[1, idx]
p_rgb = wavefield_to_rgb(
selected_p_wave_plane.isel(t=idx).T.values,
p_wave_cmap,
-selected_p_wave_plane.max() / clipping_ratio,
selected_p_wave_plane.max() / clipping_ratio,
)
s_rgb = wavefield_to_rgb(
selected_s_wave_plane.isel(t=idx).T.values,
s_wave_cmap,
-selected_s_wave_plane.max() / clipping_ratio,
selected_s_wave_plane.max() / clipping_ratio,
)
blended = blend_images_multiply(p_rgb, s_rgb)
ax.imshow(
blended,
extent=[x[0], x[-1], y[0], y[-1]],
origin="lower",
aspect="auto",
interpolation="bilinear",
)
# Add beam direction line from vertical
angle_rad = np.radians(PLANE_WAVE_ANGLE_DEG)
# Line from (0, 0) downward at given angle
line_length = abs(y[0]) # Distance to bottom of domain
x_end = line_length * np.tan(angle_rad)
# Array extent (from -0.025 to 0.025)
array_half_width = TRANSDUCER_WIDTH_M / 2.0
# Center beam line (in legend)
ax.plot(
[0, x_end],
[0, y[0]],
"k--",
linewidth=1.5,
alpha=0.3,
label="Beam direction",
)
# Edge beam lines (fainter, not in legend)
x_end_left = -array_half_width + line_length * np.tan(angle_rad)
x_end_right = array_half_width + line_length * np.tan(angle_rad)
ax.plot(
[-array_half_width, x_end_left],
[0, y[0]],
"k--",
linewidth=1.0,
alpha=0.15,
)
ax.plot(
[array_half_width, x_end_right],
[0, y[0]],
"k--",
linewidth=1.0,
alpha=0.15,
)
ax.set_xlim(x[0], x[-1])
ax.set_ylim(y[0], y[-1])
# Add row title to center column
if idx == 1:
ax.set_title(
f"Plane Wave ({PLANE_WAVE_ANGLE_DEG}°)\nt = {time_val:.2f} μs"
)
else:
ax.set_title(f"t = {time_val:.2f} μs")
ax.set_aspect("equal")
# Only label leftmost axes
if idx == 0:
ax.set_ylabel("y [m]")
ax.legend(loc="lower left")
else:
ax.set_yticklabels([])
# All bottom row axes have x labels
ax.set_xlabel("x [m]")
# Add colorbars centered on the right
fig.subplots_adjust(right=0.88)
# P-wave colorbar (centered vertically)
cax_p = fig.add_axes([0.90, 0.55, 0.02, 0.30])
plt.colorbar(
plt.cm.ScalarMappable(
cmap=p_wave_cmap,
norm=mcolors.Normalize(vmin=-1, vmax=1),
),
cax=cax_p,
label="P-wave",
)
# S-wave colorbar (centered vertically)
cax_s = fig.add_axes([0.90, 0.15, 0.02, 0.30])
_ = plt.colorbar(
plt.cm.ScalarMappable(
cmap=s_wave_cmap,
norm=mcolors.Normalize(vmin=-1, vmax=1),
),
cax=cax_s,
label="S-wave",
)# Print arrival time information
print(f"P-wave focus point: {SPHERICAL_FOCUS_POINT}")
print(f"P-wave arrival time: {FOCUS_TIME_P_S*1e6:.3f} μs")
print(f"S-wave arrival time: {ARRIVAL_TIME_S_S*1e6:.3f} μs")
times_s_wave_spherical = [
FOCUS_TIME_P_S,
0.5 * (FOCUS_TIME_P_S + ARRIVAL_TIME_S_S),
ARRIVAL_TIME_S_S,
]
# Create plot for spherical beamforming at P-wave and S-wave arrival times
fig, axes = plt.subplots(1, 3, figsize=(12, 4))
fig.suptitle(
"Spherical Beamforming: Wave Arrivals at Focus Point", fontsize=14
)
for idx, time_val_s in enumerate(times_s_wave_spherical):
# Select nearest time index for this absolute time
time_idx = int(np.abs(p_wave_spherical.t.values - time_val_s).argmin())
time_val = float(p_wave_spherical.t[time_idx].values) * 1e6
ax = axes[idx]
p_rgb = wavefield_to_rgb(
p_wave_spherical.isel(t=time_idx).T.values,
p_wave_cmap,
-p_wave_spherical.max() / clipping_ratio,
p_wave_spherical.max() / clipping_ratio,
)
s_rgb = wavefield_to_rgb(
s_wave_spherical.isel(t=time_idx).T.values,
s_wave_cmap,
-s_wave_spherical.max() / clipping_ratio,
s_wave_spherical.max() / clipping_ratio,
)
blended = blend_images_multiply(p_rgb, s_rgb)
ax.imshow(
blended,
extent=[x[0], x[-1], y[0], y[-1]],
origin="lower",
aspect="auto",
interpolation="bilinear",
)
# Draw circle around P-wave focus point
focus_circle = plt.Circle(
SPHERICAL_FOCUS_POINT,
radius=0.006,
fill=False,
edgecolor="black",
linestyle=":",
linewidth=1.5,
label="Focus point",
)
ax.add_patch(focus_circle)
# Add title with time
if idx == 0:
ax.set_title(f"P-wave arrival\nt = {time_val:.2f} μs")
elif idx == 1:
ax.set_title(f"Between arrivals\nt = {time_val:.2f} μs")
else:
ax.set_title(f"S-wave arrival\nt = {time_val:.2f} μs")
ax.set_aspect("equal")
ax.set_xlabel("x [m]")
# Only label leftmost axis
if idx == 0:
ax.set_ylabel("y [m]")
ax.legend(loc="lower left")
else:
ax.set_yticklabels([])
# Add colorbars on the right
fig.subplots_adjust(right=0.88)
# P-wave colorbar
cax_p = fig.add_axes([0.90, 0.55, 0.02, 0.30])
plt.colorbar(
plt.cm.ScalarMappable(
cmap=p_wave_cmap,
norm=mcolors.Normalize(vmin=-1, vmax=1),
),
cax=cax_p,
label="P-wave",
)
# S-wave colorbar
cax_s = fig.add_axes([0.90, 0.15, 0.02, 0.30])
_ = plt.colorbar(
plt.cm.ScalarMappable(
cmap=s_wave_cmap,
norm=mcolors.Normalize(vmin=-1, vmax=1),
),
cax=cax_s,
label="S-wave",
)P-wave focus point: (0.005, -0.02) P-wave arrival time: 3.575 μs S-wave arrival time: 6.010 μs