pygrt.utils 源代码

"""
    :file:     utils.py  
    :author:   Zhu Dengda (zhudengda@mail.iggcas.ac.cn)  
    :date:     2024-07-24  

    该文件包含一些数据处理操作上的补充:   
        1、剪切源、单力源、爆炸源、矩张量源 通过格林函数合成理论地震图的函数\n
        2、Stream类型的时域卷积、微分、积分 (基于numpy和scipy)    \n
        3、Stream类型写到本地sac文件,自定义名称    \n
        4、读取波数积分和峰谷平均法过程文件  \n
        5、其它辅助函数  \n

"""


import numpy as np
import numpy.ctypeslib as npct
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from obspy import Stream, Trace 
from obspy.core import AttribDict
from copy import deepcopy
from scipy.signal import oaconvolve
from scipy.fft import rfft, irfft
from scipy.special import jv
from scipy.interpolate import interpn
import math 
import os
import glob
from typing import List, Union
from copy import deepcopy

from numpy.typing import ArrayLike

from .c_interfaces import *


__all__ = [
    "gen_syn_from_gf_DC",
    "gen_syn_from_gf_SF",
    "gen_syn_from_gf_EX",
    "gen_syn_from_gf_MT",

    "compute_strain",
    "compute_rotation",
    "compute_stress",

    "stream_convolve",
    "stream_integral",
    "stream_diff",
    "stream_write_sac",

    "read_kernels_freqs",
    "read_statsfile",
    "read_statsfile_ptam",
    "plot_statsdata",
    "plot_statsdata_ptam",

    "solve_lamb1"
]


#=================================================================================================================
#
#                                           根据辐射因子合成地震图
#
#=================================================================================================================

def _gen_syn_from_gf(st:Stream, calc_upar:bool, compute_type:str, M0:float, az:float, ZNE=False, **kwargs):
    r"""
        一个发起函数,根据不同震源参数,从格林函数中合成理论地震图

        :param    st:              计算好的时域格林函数, :class:`obspy.Stream` 类型
        :param    calc_upar:       是否计算位移u的空间导数
        :param    compute_type:    计算类型,应为以下之一: 
                                    'COMPUTE_EX'(爆炸源), 'COMPUTE_SF'(单力源),
                                    'COMPUTE_DC'(剪切源), 'COMPUTE_MT'(矩张量源)
        :param    M0:              标量地震矩, 单位dyne*cm
        :param    az:              方位角(度)
        :param    ZNE:             是否以ZNE分量输出?
            
    """
    chs = ZRTchs
    sacin_prefixes = ["", "z", "r", ""]   # 输入通道名
    sacout_prefixes = ["", "z", "r", "t"]   # 输出通道名
    srcName = ["EX", "VF", "HF", "DD", "DS", "SS"]
    allchs = [tr.stats.channel for tr in st]

    baz = 180 + az
    if baz > 360:
        baz -= 360

    azrad = np.deg2rad(az)

    calcUTypes = 4 if calc_upar else 1

    stall = Stream()

    dist = st[0].stats.sac['dist']
    upar_scale:float = 1.0
    for ityp in range(calcUTypes):
        if ityp > 0:
            upar_scale = 1e-5
        if ityp == 3:
            upar_scale /= dist

        srcRadi = _set_source_radi(ityp==3, upar_scale, compute_type, M0, azrad, **kwargs)

        inpref = sacin_prefixes[ityp]
        outpref = sacout_prefixes[ityp]

        for c in range(CHANNEL_NUM):
            ch = chs[c]
            tr:Trace = st[0].copy()
            tr.data[:] = 0.0
            tr.stats.channel = kcmpnm = f'{outpref}{ch}'
            __check_trace_attr_sac(tr, az=az, baz=baz, kcmpnm=kcmpnm)
            for k in range(SRC_M_NUM):
                coef = srcRadi[k, c]
                if coef==0.0:
                    continue

                # 读入数据
                channel = f'{inpref}{srcName[k]}{ch}'
                if channel not in allchs:
                    raise ValueError(f"Failed, channel=\"{channel}\" not exists.")
                    
                tr0 = st.select(channel=channel)[0].copy()
                
                tr.data += coef*tr0.data

            stall.append(tr)

    if ZNE:
        stall = _data_zrt2zne(stall)
            
    return stall


def _gen_syn_from_static_gf(grn:dict, calc_upar:bool, compute_type:str, M0:float, ZNE=False, **kwargs):
    r"""
        一个发起函数,根据不同震源参数,从静态格林函数中合成理论静态场

        :param    grn:             计算好的静态格林函数, 字典类型
        :param    calc_upar:       是否计算位移u的空间导数
        :param    compute_type:    计算类型,应为以下之一: 
                                    'COMPUTE_EX'(爆炸源), 'COMPUTE_SF'(单力源),
                                    'COMPUTE_DC'(剪切源), 'COMPUTE_MT'(矩张量源)
        :param    M0:              标量地震矩, 单位dyne*cm
        :param    ZNE:             是否以ZNE分量输出?
            
    """
    chs = ZRTchs
    sacin_prefixes = ["", "z", "r", ""]   # 输入通道名
    srcName = ["EX", "VF", "HF", "DD", "DS", "SS"]
    allchs = list(grn.keys())

    calcUTypes = 4 if calc_upar else 1

    xarr:np.ndarray = grn['_xarr']
    yarr:np.ndarray = grn['_yarr']

    # 结果字典
    resDct = {}

    # 基本数据拷贝
    for k in grn.keys():
        if k[0] != '_':
            continue 
        resDct[k] = deepcopy(grn[k])

    XX = np.zeros((calcUTypes, 3, len(xarr), len(yarr)), dtype='f8')
    dblsyn = (c_double*3)()
    dblupar = (c_double*9)()

    for iy in range(len(yarr)):
        for ix in range(len(xarr)):
            # 震中距
            dist = max(np.sqrt(xarr[ix]**2 + yarr[iy]**2), 1e-5)

            # 方位角
            azrad = np.arctan2(yarr[iy], xarr[ix])

            upar_scale:float = 1.0
            for ityp in range(calcUTypes):
                if ityp > 0:
                    upar_scale = 1e-5
                if ityp == 3:
                    upar_scale /= dist

                srcRadi = _set_source_radi(ityp==3, upar_scale, compute_type, M0, azrad, **kwargs)

                inpref = sacin_prefixes[ityp]

                for c in range(CHANNEL_NUM):
                    ch = chs[c]
                    for k in range(SRC_M_NUM):
                        coef = srcRadi[k, c]
                        if coef==0.0:
                            continue

                        # 读入数据
                        channel = f'{inpref}{srcName[k]}{ch}'
                        if channel not in allchs:
                            raise ValueError(f"Failed, channel=\"{channel}\" not exists.")
                            
                        XX[ityp, c, ix, iy] += coef*grn[channel][ix, iy]

            if ZNE:
                for i in range(3):
                    dblsyn[i] = XX[0, i, ix, iy]
                    if calc_upar:
                        for k in range(3):
                            dblupar[k + i*3] = XX[i+1, k, ix, iy]
                if calc_upar:
                    C_grt_rot_zrt2zxy_upar(azrad, dblsyn, dblupar, dist*1e5)
                else:
                    C_grt_rot_zxy2zrt_vec(-azrad, dblsyn)

                for i in range(3):
                    XX[0, i, ix, iy] = dblsyn[i]
                    if calc_upar:
                        for k in range(3):
                            XX[i+1, k, ix, iy] = dblupar[k + i*3]


    # 将XX数组分到字典中
    if ZNE:
        chs = ZNEchs

    for ityp in range(calcUTypes):
        c1 = '' if ityp==0 else chs[ityp-1].lower()
        for c in range(3):
            resDct[f"{c1}{chs[c]}"] = XX[ityp, c].copy()
                
    return resDct


def _data_zrt2zne(stall:Stream):
    r"""
        将位移分量和位移空间导数分量转为ZNE坐标系

        :param     stall:     柱坐标系(zrt)下合成地震图

        :return:
            - **stream** - :class:`obspy.Stream` 类型
    """

    chs = ZRTchs

    synLst:List[Trace] = []  # 顺序要求Z, R, T
    uparLst:List[Trace] = [] # 顺序要求zXXZ, zXXR, zXXT, rXXZ, rXXR, rXXT, tXXZ, tXXR, tXXT
    stsyn_upar = Stream()
    for ch in chs:
        st = stall.select(channel=f"{ch}")
        if len(st) == 1:
            synLst.append(st[0])
        
        for ch2 in chs:
            st = stall.select(channel=f"{ch.lower()}{ch2}")
            if len(st) == 1:
                uparLst.append(st[0])

    if len(synLst) != 3:
        raise ValueError(f"WRONG! synLst should have 3 components.")
    if len(stsyn_upar) != 0 and len(stsyn_upar) != 9:
        raise ValueError(f"WRONG! stsyn_upar should have 0 or 9 components.")

    
    # 是否有空间导数
    doupar = (len(uparLst) == 9)
    
    nt = stall[0].stats.npts
    azrad = np.deg2rad(stall[0].stats.sac['az'])
    dist = stall[0].stats.sac['dist']

    dblsyn = (c_double * 3)()
    dbleupar = (c_double * 9)()

    # 对每一个时间点
    for n in range(nt):
        # 复制数据
        for i1 in range(3):
            dblsyn[i1] = synLst[i1].data[n]
            if doupar:
                for i2 in range(3):
                    dbleupar[i2 + i1*3] = uparLst[i2 + i1*3].data[n]
        
        if doupar:
            C_grt_rot_zrt2zxy_upar(azrad, dblsyn, dbleupar, dist*1e5)
        else:
            C_grt_rot_zxy2zrt_vec(-azrad, dblsyn)

        # 将结果写入原数组
        for i1 in range(3):
            synLst[i1].data[n] = dblsyn[i1]
            if doupar:
                for i2 in range(3):
                    uparLst[i2 + i1*3].data[n] = dbleupar[i2 + i1*3]

    # 修改通道名
    for i1 in range(3):
        ch1 = ZNEchs[i1]
        tr = synLst[i1]
        tr.stats.channel = tr.stats.sac['kcmpnm'] = f'{ch1}'
        if doupar:
            for i2 in range(3):
                ch2 = ZNEchs[i2]
                tr = uparLst[i2 + i1*3]
                tr.stats.channel = tr.stats.sac['kcmpnm'] = f'{ch1.lower()}{ch2}'

    stres = Stream()
    stres.extend(synLst)
    if doupar:
        stres.extend(uparLst)

    return stres


def _set_source_radi(
    par_theta:bool, coef:float, compute_type:str, M0:float, azrad:float,
    fZ=None, fN=None, fE=None, 
    strike=None, dip=None, rake=None, 
    MT=None, **kwargs):
    r"""
        设置不同震源的方向因子矩阵

        :param    par_theta:       是否求对theta的偏导
        :param    coef:            比例系数
        :param    compute_type:    计算类型,应为以下之一: 
                                    'COMPUTE_EX'(爆炸源), 'COMPUTE_SF'(单力源),
                                    'COMPUTE_DC'(剪切源), 'COMPUTE_MT'(矩张量源)
        :param    M0:              地震矩
        :param    azrad:           方位角(弧度)

        - 其他参数根据计算类型可选:
            - 单力源需要: fZ, fN, fE, 
            - 剪切源需要: strike, dip, rake
            - 矩张量源需要: MT=(Mxx, Mxy, Mxz, Myy, Myz, Mzz)
    """
    
    caz = np.cos(azrad)
    saz = np.sin(azrad)

    src_coef = np.zeros((SRC_M_NUM, CHANNEL_NUM), dtype='f8')
    
    # 计算乘法因子
    if compute_type == 'COMPUTE_SF':
        mult = 1e-15 * M0 * coef 
    else:
        mult = 1e-20 * M0 * coef 

    # 根据不同计算类型处理
    if compute_type == 'COMPUTE_EX':
        # 爆炸源情况
        src_coef[0, 0] = src_coef[0, 1] = 0.0 if par_theta else mult  # Z/R分量
        src_coef[0, 2] = 0.0  # T分量
    
    elif compute_type == 'COMPUTE_SF':
        # 单力源情况
        # 计算各向异性系数
        A0 = fZ * mult
        A1 = (fN * caz + fE * saz) * mult
        A4 = (-fN * saz + fE * caz) * mult

        # 设置震源系数矩阵 (公式4.6.20)
        src_coef[1, 0] = src_coef[1, 1] = 0.0 if par_theta else A0  # VF, Z/R
        src_coef[2, 0] = src_coef[2, 1] = A4 if par_theta else A1  # HF, Z/R
        src_coef[1, 2] = 0.0  # VF, T
        src_coef[2, 2] = -A1 if par_theta else A4  # HF, T
    
    elif compute_type == 'COMPUTE_DC':
        # 剪切源情况 (公式4.8.35)
        # 计算各种角度值(转为弧度)
        stkrad = np.deg2rad(strike)  # 走向角
        diprad = np.deg2rad(dip)    # 倾角
        rakrad = np.deg2rad(rake)   # 滑动角
        therad = azrad - stkrad  # 方位角与走向角差
        
        # 计算各种三角函数值
        srak = np.sin(rakrad);      crak = np.cos(rakrad)
        sdip = np.sin(diprad);      cdip = np.cos(diprad)
        sdip2 = 2.0 * sdip * cdip;  cdip2 = 2.0 * cdip**2 - 1.0
        sthe = np.sin(therad);      cthe = np.cos(therad)
        sthe2 = 2.0 * sthe * cthe;  cthe2 = 2.0 * cthe**2 - 1.0

        # 计算各向异性系数
        A0 = mult * (0.5 * sdip2 * srak)
        A1 = mult * (cdip * crak * cthe - cdip2 * srak * sthe)
        A2 = mult * (0.5 * sdip2 * srak * cthe2 + sdip * crak * sthe2)
        A4 = mult * (-cdip2 * srak * cthe - cdip * crak * sthe)
        A5 = mult * (sdip * crak * cthe2 - 0.5 * sdip2 * srak * sthe2)

        # 设置震源系数矩阵
        src_coef[3, 0] = src_coef[3, 1] = 0.0 if par_theta else A0  # DD, Z/R
        src_coef[4, 0] = src_coef[4, 1] = A4 if par_theta else A1  # DS, Z/R
        src_coef[5, 0] = src_coef[5, 1] = 2.0 * A5 if par_theta else A2  # SS, Z/R
        src_coef[3, 2] = 0.0  # DD, T
        src_coef[4, 2] = -A1 if par_theta else A4  # DS, T
        src_coef[5, 2] = -2.0 * A2 if par_theta else A5  # DS, T
    
    elif compute_type == 'COMPUTE_MT':
        # 矩张量源情况 (公式4.9.7,修改了各向同性项)
        # 初始化矩张量分量
        M11, M12, M13, M22, M23, M33 = MT
        
        # 计算各向同性部分并减去
        Mexp = (M11 + M22 + M33) / 3.0
        M11 -= Mexp
        M22 -= Mexp
        M33 -= Mexp
        
        # 计算方位角的2倍角三角函数
        caz2 = np.cos(2 * azrad)
        saz2 = np.sin(2 * azrad)
        
        # 计算各向异性系数
        A0 = mult * ((2.0 * M33 - M11 - M22) / 6.0)
        A1 = mult * (- (M13 * caz + M23 * saz))
        A2 = mult * (0.5 * (M11 - M22) * caz2 + M12 * saz2)
        A4 = mult * (M13 * saz - M23 * caz)
        A5 = mult * (-0.5 * (M11 - M22) * saz2 + M12 * caz2)

        # 设置震源系数矩阵
        src_coef[0, 0] = src_coef[0, 1] = 0.0 if par_theta else mult * Mexp  # EX, Z/R
        src_coef[3, 0] = src_coef[3, 1] = 0.0 if par_theta else A0  # DD, Z/R
        src_coef[4, 0] = src_coef[4, 1] = A4 if par_theta else A1  # DS, Z/R
        src_coef[5, 0] = src_coef[5, 1] = 2.0 * A5 if par_theta else A2  # SS, Z/R
        src_coef[0, 2] = 0.0  # EX, T
        src_coef[3, 2] = 0.0  # DD, T
        src_coef[4, 2] = -A1 if par_theta else A4  # DS, T
        src_coef[5, 2] = -2.0 * A2 if par_theta else A5  # DS, T


    return src_coef


[文档] def gen_syn_from_gf_DC(st:Union[Stream,dict], M0:float, strike:float, dip:float, rake:float, az:float=-999, ZNE=False, calc_upar:bool=False): ''' Shear source, the unit of angles is all degrees(°) :param st: Green's functions in a :class:`obspy.Stream` (dynamic-case) or a dict (static-case) :param M0: scalar seismic moment (dyne*cm) :param strike: 0 <= strike <= 360 (north=0, clockwise as positive) :param dip: 0 <= dip <= 90 :param rake: -180 <= rake <= 180 (on the fault plane, counterclockwise as positive) :param az: azimuth, 0 <= az <= 360 (not used for static case) :param ZNE: whether output in 'ZNE'-coord, default is 'ZRT' :param calc_upar: whether calculate the spatial derivatives of displacements. :return: - **stream** - :class:`obspy.Stream` ''' if isinstance(st, Stream): if az > 360 or az < -360: raise ValueError(f"WRONG azimuth ({az})") return _gen_syn_from_gf(st, calc_upar, "COMPUTE_DC", M0, az, ZNE, strike=strike, dip=dip, rake=rake) elif isinstance(st, dict): return _gen_syn_from_static_gf(st, calc_upar, "COMPUTE_DC", M0, ZNE, strike=strike, dip=dip, rake=rake) else: raise NotImplementedError
[文档] def gen_syn_from_gf_SF(st:Union[Stream,dict], S:float, fN:float, fE:float, fZ:float, az:float=-999, ZNE=False, calc_upar:bool=False): ''' Single-force source (dyne) :param st: Green's functions in a :class:`obspy.Stream` (dynamic-case) or a dict (static-case) :param S: scaling factor (dyne) :param fN: coefficient of Northward force :param fE: coefficient of Eastward force :param fZ: coefficient of Vertical(Downward) force :param az: azimuth, 0 <= az <= 360 (not used for static case) :param ZNE: whether output in 'ZNE'-coord, default is 'ZRT' :param calc_upar: whether calculate the spatial derivatives of displacements. :return: - **stream** - :class:`obspy.Stream` ''' if isinstance(st, Stream): if az > 360 or az < -360: raise ValueError(f"WRONG azimuth ({az})") return _gen_syn_from_gf(st, calc_upar, "COMPUTE_SF", S, az, ZNE, fN=fN, fE=fE, fZ=fZ) elif isinstance(st, dict): return _gen_syn_from_static_gf(st, calc_upar, "COMPUTE_SF", S, ZNE, fN=fN, fE=fE, fZ=fZ) else: raise NotImplementedError
[文档] def gen_syn_from_gf_EX(st:Union[Stream,dict], M0:float, az:float=-999, ZNE=False, calc_upar:bool=False): ''' Explosion :param st: Green's functions in a :class:`obspy.Stream` (dynamic-case) or a dict (static-case) :param M0: scalar seismic moment (dyne*cm) :param az: azimuth, 0 <= az <= 360 (not used for static case) :param ZNE: whether output in 'ZNE'-coord, default is 'ZRT' :param calc_upar: whether calculate the spatial derivatives of displacements. :return: - **stream** - :class:`obspy.Stream` ''' if isinstance(st, Stream): if az > 360 or az < -360: raise ValueError(f"WRONG azimuth ({az})") return _gen_syn_from_gf(st, calc_upar, "COMPUTE_EX", M0, az, ZNE) elif isinstance(st, dict): return _gen_syn_from_static_gf(st, calc_upar, "COMPUTE_EX", M0, ZNE) else: raise NotImplementedError
[文档] def gen_syn_from_gf_MT(st:Union[Stream,dict], M0:float, MT:ArrayLike, az:float=-999, ZNE=False, calc_upar:bool=False): ''' Moment tensor :param st: Green's functions in a :class:`obspy.Stream` (dynamic-case) or a dict (static-case) :param M0: scalar seismic moment (dyne*cm) :param MT: coefficient of Moment tensor (M11, M12, M13, M22, M23, M33), subscripts 1,2,3 denote Northward,Eastward,Downward :param az: azimuth, 0 <= az <= 360 (not used for static case) :param ZNE: whether output in 'ZNE'-coord, default is 'ZRT' :param calc_upar: whether calculate the spatial derivatives of displacements. :return: - **stream** - :class:`obspy.Stream` ''' if isinstance(st, Stream): if az > 360 or az < -360: raise ValueError(f"WRONG azimuth ({az})") return _gen_syn_from_gf(st, calc_upar, "COMPUTE_MT", M0, az, ZNE, MT=MT) elif isinstance(st, dict): return _gen_syn_from_static_gf(st, calc_upar, "COMPUTE_MT", M0, ZNE, MT=MT) else: raise NotImplementedError
#================================================================================================================= # # 根据几何方程和本构方程合成应力、应变、旋转张量 # #================================================================================================================= def _compute_strain_rotation(st_syn:Stream, Type:str): r""" Compute dynamic strain/rotation tensor from synthetic spatial derivatives. :param st_syn: synthetic spatial derivatives. :param Type: "strain" or "rotation" :return: - **stream** - dynamic strain/rotation tensor, in :class:`obspy.Stream` class. """ if Type == 'strain': sgn = 1 i1_end = 3 i2_offset = 0 elif Type == 'rotation': sgn = -1 i1_end = 2 i2_offset = 1 else: raise ValueError(f"{Type} not supported.") chs = ZRTchs # 判断是否有标志性的trace if len(st_syn.select(channel=f"nN")) > 0: chs = ZNEchs dist = st_syn[0].stats.sac['dist'] # ---------------------------------------------------------------------------------- # 循环6/3个分量 stres = Stream() for i1 in range(i1_end): c1 = chs[i1] for i2 in range(i1+i2_offset, 3): c2 = chs[i2] channel = f"{c2.lower()}{c1}" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") tr = st[0].copy() channel = f"{c1.lower()}{c2}" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") tr.data = (tr.data + sgn*st[0].data) * 0.5 # 特殊情况加上协变导数 if c1=='R' and c2=='T': channel = f"T" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") tr.data -= 0.5*st[0].data / dist * 1e-5 elif c1=='T' and c2=='T': channel = f"R" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") tr.data += st[0].data / dist * 1e-5 # 修改通道名 tr.stats.channel = tr.stats.sac['kcmpnm'] = f"{c1}{c2}" stres.append(tr) return stres def _compute_static_strain_rotation(syn:dict, Type:str): r""" Compute static strain/rotation tensor from synthetic spatial derivatives. :param syn: synthetic spatial derivatives. :param Type: "strain" or "rotation" :return: - **res** - static strain/rotation tensor, in dict class. """ if Type == 'strain': sgn = 1 i1_end = 3 i2_offset = 0 elif Type == 'rotation': sgn = -1 i1_end = 2 i2_offset = 1 else: raise ValueError(f"{Type} not supported.") chs = ZRTchs # 判断是否有标志性的分量名 if f"nN" in syn.keys(): chs = ZNEchs xarr:np.ndarray = syn['_xarr'] yarr:np.ndarray = syn['_yarr'] # 结果字典 resDct = {} # 基本数据拷贝 for k in syn.keys(): if k[0] != '_': continue resDct[k] = deepcopy(syn[k]) # 6/3个分量建立数组 for i1 in range(i1_end): c1 = chs[i1] for i2 in range(i1+i2_offset, 3): c2 = chs[i2] channel = f"{c1}{c2}" resDct[channel] = np.zeros((len(xarr), len(yarr)), dtype='f8') for iy in range(len(yarr)): for ix in range(len(xarr)): # 震中距 dist = max(np.sqrt(xarr[ix]**2 + yarr[iy]**2), 1e-5) # ---------------------------------------------------------------------------------- # 循环6/3个分量 for i1 in range(i1_end): c1 = chs[i1] for i2 in range(i1+i2_offset, 3): c2 = chs[i2] channel = f"{c2.lower()}{c1}" v12 = syn[channel][ix, iy] channel = f"{c1.lower()}{c2}" v21 = syn[channel][ix, iy] val = 0.5*(v12 + sgn*v21) # 特殊情况加上协变导数 if c1=='R' and c2=='T': channel = f"T" v0 = syn[channel][ix, iy] val -= 0.5*v0 / dist * 1e-5 elif c1=='T' and c2=='T': channel = f"R" v0 = syn[channel][ix, iy] val += v0 / dist * 1e-5 channel = f"{c1}{c2}" resDct[channel][ix, iy] = val return resDct
[文档] def compute_strain(st:Union[Stream,dict]): r""" Compute dynamic/static strain tensor from synthetic spatial derivatives. :param st: synthetic spatial derivatives :class:`obspy.Stream` class for dynamic case, dict class for static case. :return: - **stres** - dynamic/static strain tensor, in :class:`obspy.Stream` class or dict class. """ if isinstance(st, Stream): return _compute_strain_rotation(st, "strain") elif isinstance(st, dict): return _compute_static_strain_rotation(st, "strain") else: raise NotImplementedError
[文档] def compute_rotation(st:Union[Stream,dict]): r""" Compute dynamic/static rotation tensor from synthetic spatial derivatives. :param st: synthetic spatial derivatives :class:`obspy.Stream` class for dynamic case, dict class for static case. :return: - **stres** - dynamic/static rotation tensor, in :class:`obspy.Stream` class or dict class. """ if isinstance(st, Stream): return _compute_strain_rotation(st, "rotation") elif isinstance(st, dict): return _compute_static_strain_rotation(st, "rotation") else: raise NotImplementedError
def _compute_stress(st_syn:Stream): r""" Compute dynamic stress tensor from synthetic spatial derivatives. :param st_syn: synthetic spatial derivatives. :return: - **stream** - dynamic stress tensor (unit: dyne/cm^2 = 0.1 Pa), in :class:`obspy.Stream` class. """ # 由于有Q值的存在,lambda和mu变成了复数,需在频域进行 chs = ZRTchs rot2ZNE:bool = False # 判断是否有标志性的trace if len(st_syn.select(channel=f"nN")) > 0: chs = ZNEchs rot2ZNE = True nt = st_syn[0].stats.npts dt = st_syn[0].stats.delta dist = st_syn[0].stats.sac['dist'] df = 1.0/(nt*dt) nf = nt//2 + 1 va = st_syn[0].stats.sac['user1'] vb = st_syn[0].stats.sac['user2'] rho = st_syn[0].stats.sac['user3'] Qainv = st_syn[0].stats.sac['user4'] Qbinv = st_syn[0].stats.sac['user5'] # 计算不同频率下的拉梅系数 mus = np.zeros((nf,), dtype='c16') lams = np.zeros((nf,), dtype='c16') omega = (REAL*2)(0.0, 0.0) atte = (REAL*2)(0.0, 0.0) for i in range(nf): freq = 0.01 if i==0 else df*i w = 2.0*np.pi*freq omega[0] = w C_grt_py_attenuation_law(Qbinv, omega, atte) attb = atte[0] + atte[1]*1j mus[i] = vb*vb*attb*attb*rho*1e10 C_grt_py_attenuation_law(Qainv, omega, atte) atta = atte[0] + atte[1]*1j lams[i] = va*va*atta*atta*rho*1e10 - 2.0*mus[i] del omega, atte # ---------------------------------------------------------------------------------- # 先计算体积应变u_kk = u_11 + u22 + u33 和 lamda的乘积 lam_ukk = np.zeros((nf,), dtype='c16') for i in range(3): c = chs[i] channel = f"{c.lower()}{c}" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") lam_ukk[:] += rfft(st[0].data, nt) # 加上协变导数 if not rot2ZNE: channel = f"R" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") lam_ukk[:] += rfft(st[0].data, nt) / dist * 1e-5 lam_ukk[:] *= lams # ---------------------------------------------------------------------------------- # 循环6个分量 stres = Stream() for i1 in range(3): c1 = chs[i1] for i2 in range(i1, 3): c2 = chs[i2] channel = f"{c2.lower()}{c1}" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") tr = st[0].copy() fftarr = np.zeros((nf,), dtype='c16') channel = f"{c1.lower()}{c2}" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") fftarr[:] = rfft(tr.data + st[0].data, nt) * mus # 对于对角线分量,需加上lambda * u_kk if c1==c2: fftarr[:] += lam_ukk # 特殊情况加上协变导数 if c1=='R' and c2=='T': channel = f"T" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") fftarr[:] -= mus*rfft(st[0].data, nt) / dist * 1e-5 elif c1=='T' and c2=='T': channel = f"R" st = st_syn.select(channel=channel) if len(st) == 0: raise NameError(f"{channel} not exists.") fftarr[:] += 2.0*mus*rfft(st[0].data, nt) / dist * 1e-5 # 修改通道名 tr.stats.channel = tr.stats.sac['kcmpnm'] = f"{c1}{c2}" tr.data = irfft(fftarr, nt) stres.append(tr) return stres def _compute_static_stress(syn:dict): r""" Compute static stress tensor from synthetic spatial derivatives. :param syn: synthetic spatial derivatives. :return: - **res** - static stress tensor (unit: dyne/cm^2 = 0.1 Pa), in dict class. """ chs = ZRTchs rot2ZNE:bool = False # 判断是否有标志性的分量名 if f"nN" in syn.keys(): chs = ZNEchs rot2ZNE = True xarr:np.ndarray = syn['_xarr'] yarr:np.ndarray = syn['_yarr'] va = syn['_rcv_va'] vb = syn['_rcv_vb'] rho = syn['_rcv_rho'] mu = vb*vb*rho*1e10 lam = va*va*rho*1e10 - 2.0*mu # 结果字典 resDct = {} # 基本数据拷贝 for k in syn.keys(): if k[0] != '_': continue resDct[k] = deepcopy(syn[k]) # 6个分量建立数组 for i1 in range(3): c1 = chs[i1] for i2 in range(i1, 3): c2 = chs[i2] channel = f"{c1}{c2}" resDct[channel] = np.zeros((len(xarr), len(yarr)), dtype='f8') for iy in range(len(yarr)): for ix in range(len(xarr)): # 震中距 dist = max(np.sqrt(xarr[ix]**2 + yarr[iy]**2), 1e-5) # ---------------------------------------------------------------------------------- # 先计算体积应变u_kk = u_11 + u22 + u33 和 lamda的乘积 lam_ukk = 0.0 for i in range(3): c = chs[i] channel = f"{c.lower()}{c}" lam_ukk += syn[channel][ix, iy] # 加上协变导数 if not rot2ZNE: channel = f"R" lam_ukk += syn[channel][ix, iy] / dist * 1e-5 lam_ukk *= lam # ---------------------------------------------------------------------------------- # 循环6个分量 for i1 in range(3): c1 = chs[i1] for i2 in range(i1, 3): c2 = chs[i2] channel = f"{c2.lower()}{c1}" v12 = syn[channel][ix, iy] channel = f"{c1.lower()}{c2}" v21 = syn[channel][ix, iy] val = mu*(v12 + v21) # 对于对角线分量,需加上lambda * u_kk if c1==c2: val += lam_ukk # 特殊情况加上协变导数 if c1=='R' and c2=='T': channel = f"T" v0 = syn[channel][ix, iy] val -= mu*v0 / dist * 1e-5 elif c1=='T' and c2=='T': channel = f"R" v0 = syn[channel][ix, iy] val += 2.0*mu*v0 / dist * 1e-5 channel = f"{c1}{c2}" resDct[channel][ix, iy] = val return resDct
[文档] def compute_stress(st:Union[Stream,dict]): r""" Compute dynamic/static stress tensor from synthetic spatial derivatives. :param st: synthetic spatial derivatives :class:`obspy.Stream` class for dynamic case, dict class for static case. :return: - **stres** - dynamic/static stress tensor (unit: dyne/cm^2 = 0.1 Pa), in :class:`obspy.Stream` class or dict class. """ if isinstance(st, Stream): return _compute_stress(st) elif isinstance(st, dict): return _compute_static_stress(st) else: raise NotImplementedError
def __check_trace_attr_sac(tr:Trace, **kwargs): ''' 临时函数,检查trace中是否有sac字典,并将kwargs内容填入 ''' if hasattr(tr.stats, 'sac'): for k, v in kwargs.items(): tr.stats.sac[k] = v else: tr.stats.sac = AttribDict(**kwargs) #================================================================================================================= # # 卷积、微分、积分、保存SAC # #================================================================================================================= # def stream_convolve(st0:Stream, timearr:np.ndarray, inplace=True): # ''' # 频域实现线性卷 # ''' # st = st0 if inplace else deepcopy(st0) # sacAttr = st[0].stats.sac # try: # wI = sacAttr['user0'] # 虚频率 # except: # wI = 0.0 # nt = sacAttr['npts'] # dt = sacAttr['delta'] # nt2 = len(timearr) # N = nt+nt2-1 # nf = N//2 + 1 # wI_exp1 = np.exp(-wI*np.arange(0,nt)*dt) # wI_exp2 = np.exp( wI*np.arange(0,nt)*dt) # fft_tf = np.ones((nf, ), dtype='c16') # # if scale is None: # # scale = 1.0/np.trapz(timearr, dx=dt) # timearr0 = timearr.copy() # timearr0.resize((N,)) # 填充0 # timearr0[:nt] *= wI_exp1 # # FFT # fft_tf[:] = rfft(timearr0, N) # fft_tf[:] *= dt # # 对每一道做相同处理 # for tr in st: # data = tr.data # # 虚频率 # data[:] *= wI_exp1 # # FFT # fft_d = rfft(data, N) # # 卷积+系数 # fft_d[:] *= fft_tf # # IFFT # data[:] = irfft(fft_d, N)[:nt] # # 虚频率 # data[:] *= wI_exp2 # return st
[文档] def stream_convolve(st0:Stream, signal0:np.ndarray, inplace=True): ''' convolve each trace with a signal :param st0: :class:`obspy.Stream` :param signal0: convolution signal :param inplace: whether change in-place :return: - **stream** - convolution result, :class:`obspy.Stream` ''' st = st0 if inplace else deepcopy(st0) signal = deepcopy(signal0) for tr in st: data = tr.data dt = tr.stats.delta fac = None user_wI = hasattr(tr.stats, "sac") and "user0" in tr.stats.sac # 使用虚频率先压制 if user_wI: npts = tr.stats.npts wI = tr.stats.sac['user0'] fac = np.exp(np.arange(0, npts)*dt*wI) signal = deepcopy(signal0) signal[:] /= fac[:len(signal)] data[:] /= fac data1 = np.pad(data, (len(signal)-1, 0), mode='wrap') # 强制循环卷 data[:] = oaconvolve(data1, signal, mode='valid')[:data.shape[0]] * dt # dt是连续卷积的系数 if user_wI: data[:] *= fac return st
[文档] def stream_integral(st0:Stream, inplace=True): ''' Perform integration on each trace :param st0: :class:`obspy.Stream` :param inplace: whether change in-place :return: - **stream** - integration result, :class:`obspy.Stream` ''' st = st0 if inplace else deepcopy(st0) for tr in st: dt = tr.stats.delta data = tr.data lastx = data[0] data[0] = 0.0 for i in range(1, len(data)): tmp = data[i] data[i] = 0.5*(data[i] + lastx)*dt + data[i-1] lastx = tmp return st
[文档] def stream_diff(st0:Stream, inplace=True): ''' Perform central difference on each trace :param st0: :class:`obspy.Stream` :param inplace: whether change in-place :return: - **stream** - difference result, :class:`obspy.Stream` ''' st = st0 if inplace else deepcopy(st0) for tr in st: data = tr.data data[:] = np.gradient(data, tr.stats.delta) return st
[文档] def stream_write_sac(st:Stream, dir:str): ''' save each trace to "dir/{channel}.sac" :param st: :class:`obspy.Stream` :param dir: saving directory ''' # 新建对应文件夹 os.makedirs(dir, exist_ok=True) # 每一道的保存路径为 dir/{channel}.sac for tr in st: filepath = os.path.join(dir, f"{tr.stats.channel}.sac") tr.write(filepath, format='SAC')
#================================================================================================================= # # 积分过程文件读取及绘制 # #=================================================================================================================
[文档] def read_statsfile(statsfile:str): ''' read a statsfile :param statsfile: File path (Wildcards can be used to simplify input) :return: - **data** - `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ custom type array ''' Lst = glob.glob(statsfile) if len(Lst) != 1: raise OSError(f"{statsfile} should only match one file, but {len(Lst)} matched.") statsfile = Lst[0] print(f"raed in {statsfile}.") basename = os.path.basename(statsfile) # 确定自定义数据类型 EX_q, EX_w, VF_q, ... dtype = [('k' if basename[0] == 'K' else 'c', NPCT_REAL_TYPE)] for im in range(SRC_M_NUM): modr = SRC_M_ORDERS[im] for c in range(QWV_NUM): if modr==0 and qwvchs[c] == 'v': continue dtype.append((f"{SRC_M_NAME_ABBR[im]}_{qwvchs[c]}", NPCT_CMPLX_TYPE)) data = np.fromfile(statsfile, dtype=dtype) return data
[文档] def read_kernels_freqs(statsdir:str, vels:Union[np.ndarray,None]=None, ktypes:Union[List[str],None]=None): r""" read all statsfiles in statsdir (except that of 0 frequency). If record wavenumber, interpolate to the phase velocity. :param statsdir: directory path :param vels: When a positive-order vels (km/s) is specified, files starting with `K_` are read and linear interpolation from wavenumber to phase velocity is performed. Otherwise read the files starting with `C_` :param ktype: Specify the return of a series of kernel function names, such as `EX_q`, `DS_w`, etc. By default, all are returned :return: - **kerDct** - kernel functions in a dict """ dointerp = vels is not None if (dointerp) and not np.all(np.diff(vels) > 0): raise ValueError("vels must be in ascending order.") K_statspaths = glob.glob(os.path.join(statsdir, "K_*")) if len(K_statspaths) == 0 and dointerp: raise ValueError("You want to interpolate from k to c, but found 0 statsfiles recording k.") C_statspaths = glob.glob(os.path.join(statsdir, "C_*")) if len(C_statspaths) == 0 and not dointerp: raise ValueError("Found 0 statsfiles directly recording c.") statspaths = K_statspaths if dointerp else C_statspaths KLst = np.array(statspaths) freqs = np.array([float(s.split("_")[-1]) for s in KLst]) # 根据freqs排序 _idx = np.argsort(freqs) freqs[:] = freqs[_idx] KLst[:] = KLst[_idx] del _idx # 去除零频 if freqs[0] == 0.0: freqs = freqs[1:] KLst = KLst[1:] kerDct = {} kerDct['_vels'] = vels.copy() if dointerp else [] kerDct['_freqs'] = freqs.copy() for i in range(len(freqs)): Kpath = KLst[i] freq = freqs[i] w = 2*np.pi*freq data = read_statsfile(Kpath) if dointerp: v = w/data['k'] # 检查v范围 v1 = np.min(v) v2 = np.max(v) if v1 > vels[0] or v2 < vels[-1]: raise ValueError(f"In freq={freq:.5e}, minV={v1:.5e}, maxV={v2:.5e}, insufficient wavenumber samples" " to interpolate on vels.") else: if len(kerDct['_vels']) == 0: kerDct['_vels'] = data['c'].copy() for key in data.dtype.names: if key == 'k' or key == 'c': continue if (ktypes is not None) and (key not in ktypes): continue if key not in kerDct.keys(): kerDct[key] = [] if dointerp: # 如果越界会报错 F = interpn((v,), data[key], vels) kerDct[key].append(F) else: kerDct[key].append(data[key]) # 将每个核函数结果拼成2D数组 for key in kerDct.keys(): if key[0] == '_': continue kerDct[key] = np.vstack(kerDct[key]) return kerDct
[文档] def read_statsfile_ptam(statsfile:str): ''' read a statsfile from PTAM process :param statsfile: File path (Wildcards can be used to simplify input) :return: - **data1** - `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ custom type array, during DCM or (SA)FIM - **data2** - `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ custom type array, during PTAM - **ptam_data** - `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ custom type array, record the peak/trough from PTAM - **dist** - epicentral distance from the filename (km) ''' Lst = glob.glob(statsfile) if len(Lst) != 1: raise OSError(f"{statsfile} should only match one file, but {len(Lst)} matched.") statsfile = Lst[0] # 获得震中距 dist = float(os.path.dirname(statsfile).split("_")[-1]) # 从文件路径命名中,获得对应的K文件路径 PTAMname = os.path.basename(statsfile) if "_" in PTAMname: # 动态解 splits = PTAMname.split("_") splits[-3] = "K" K_basename= "_".join(splits) else: K_basename = "K" # 静态解 data1 = read_statsfile(os.path.join(os.path.dirname(os.path.dirname(statsfile)), K_basename)) data2 = read_statsfile(os.path.join(os.path.dirname(statsfile), K_basename)) # 确定自定义数据类型 sum_EX_0_k, sum_EX_0, sum_VF_0_k, ... # 各格林函数数值积分的值(k上限位于不同的波峰波谷) # 开头的sum表示这是波峰波谷位置处的数值积分的值(不含dk), # 末尾的k表示对应积分值的波峰波谷位置的k值 dtype = [] for im in range(SRC_M_NUM): modr = SRC_M_ORDERS[im] for v in range(INTEG_NUM): if modr==0 and v!=0 and v!=2: continue dtype.append((f"sum_{SRC_M_NAME_ABBR[im]}_{v}_k", NPCT_REAL_TYPE)) dtype.append((f"sum_{SRC_M_NAME_ABBR[im]}_{v}", NPCT_CMPLX_TYPE)) ptam_data = np.fromfile(statsfile, dtype=dtype) return data1, data2, ptam_data, dist
def _get_stats_Fname(statsdata:np.ndarray, karr:np.ndarray, dist:float, srctype:str, ptype:str): # 根据ptype获得对应的核函数 krarr = karr*dist # 从数组中找到震源名称的索引 try: _idx = SRC_M_NAME_ABBR.index(srctype) mtype = str(SRC_M_ORDERS[_idx]) except: raise ValueError(f"{srctype} is an invalid name.") if mtype=='0': if ptype=='0': Fname = rf"$F(k,\omega)=q^{{({srctype})}}(k, \omega)$" Farr = statsdata[f'{srctype}_q'] FJname = rf"$ - F(k,\omega)J_1(kr)k$" FJarr = - jv(1, krarr) * Farr * karr elif ptype=='2': Fname = rf"$F(k,\omega)=w^{{({srctype})}}(k, \omega)$" FJname = rf"$F(k,\omega)J_0(kr)k$" Farr = statsdata[f'{srctype}_w'] FJarr = jv(0, krarr) * Farr * karr else: raise ValueError(f"source {srctype}, m={mtype}, p={ptype} is not supported.") elif mtype in ['1', '2']: m = int(mtype) if ptype=='0': Fname = rf"$F(k,\omega)=q^{{({srctype})}}(k, \omega)$" Farr = statsdata[f'{srctype}_q'] FJname = rf"$F(k,\omega)J_{m-1}(kr)k$" FJarr = jv(m-1, krarr) * Farr * karr elif ptype=='1': Fname = rf"$F(k,\omega)=q^{{({srctype})}}(k, \omega) + v^{{({srctype})}}(k, \omega)$" Farr = (statsdata[f'{srctype}_q'] + statsdata[f'{srctype}_v']) FJname = rf"$ - F(k,\omega) \dfrac{{{m}}}{{kr}} J_{m}(kr)k$" FJarr = - jv(m, krarr) * Farr * m/dist elif ptype=='2': Fname = rf"$F(k,\omega)=w^{{({srctype})}}(k, \omega)$" Farr = statsdata[f'{srctype}_w'] FJname = rf"$F(k,\omega)J_{m}(kr)k$" FJarr = jv(m, krarr) * Farr * karr elif ptype=='3': Fname = rf"$F(k,\omega)=v^{{({srctype})}}(k, \omega)$" Farr = statsdata[f'{srctype}_v'] FJname = rf"$ - F(k,\omega)J_{m-1}(kr)k$" FJarr = - jv(m-1, krarr) * Farr * karr else: raise ValueError(f"source {srctype}, m={mtype}, p={ptype} is not supported.") else: raise ValueError(f"source {srctype}, m={mtype}, p={ptype} is not supported.") return Fname, Farr, FJname, FJarr
[文档] def plot_statsdata(statsdata:np.ndarray, dist:float, srctype:str, ptype:str, RorI:Union[bool,int]=True, fig:Union[Figure,None]=None, axs:Union[Axes,None]=None): r''' Based on the data read by the :func:`read_statsfile <pygrt.utils.read_statsfile>` function, plot the kernel function :math:`F(k,\omega)`, the integrand :math:`F(k,\omega)J_m(kr)k`, and calculate the cumulative integral :math:`\sum F(k,\omega)J_m(kr)k` . .. note:: Not every source type corresponds to every order and every integration type, see :ref:`grn_types` for details. :param statsdata: return value of :func:`read_statsfile <pygrt.utils.read_statsfile>` function :param dist: epicentral distance (km) :param srctype: abbreviation of source type, including EX, VF, HF, DD, DS, SS :param ptype: integration type (0,1,2,3) :param RorI: whether to plot real or imaginary part, default is real part, pass 2 to plot both :param fig: user-defined matplotlib.Figure object, default is None :param axs: user-defined matplotlib.Axes object array (three elements), default is None :return: - **fig** - matplotlib.Figure object - **(ax1,ax2,ax3)** - matplotlib.Axes object array ''' ptype = str(ptype) karr = statsdata['k'] dk = (karr[1] - karr[0]) # 假设均匀dk is_evendk = np.allclose(np.diff(karr), dk, atol=1e-10) # 是否为均匀dk if not is_evendk: raise ValueError("Sorry, this function only supports even-distributed k.") if 0.5*np.pi/dk < dist: # 对于bessel函数这种震荡函数,假设一个周期内至少取4个点 print(f"WARNING! dist ({dist}) > PI/(2*dk) ({0.5*np.pi/dk:.5e}.)") Fname, Farr, FJname, FJarr = _get_stats_Fname(statsdata, karr, dist, srctype, ptype) if fig is None or axs is None: fig, axs = plt.subplots(3, 1, figsize=(8, 9), gridspec_kw=dict(hspace=0.7)) # axs长度必须为三个 if len(axs) != 3: raise ValueError("axs should have 3 elements.") ax1, ax2, ax3 = axs if isinstance(RorI, int) and RorI==2: ax1.plot(karr, np.real(Farr), lw=0.8, label='Real') ax1.plot(karr, np.imag(Farr), lw=0.8, label='Imag') else: if RorI: ax1.plot(karr, np.real(Farr), lw=0.8, label='Real') else: ax1.plot(karr, np.imag(Farr), lw=0.8, label='Imag') ax1.set_xlabel('k /$km^{-1}$') ax1.set_title(Fname) ax1.grid() ax1.legend(loc='lower left') if isinstance(RorI, int) and RorI==2: ax2.plot(karr, np.real(FJarr), lw=0.8, label='Real') ax2.plot(karr, np.imag(FJarr), lw=0.8, label='Imag') else: if RorI: ax2.plot(karr, np.real(FJarr), lw=0.8, label='Real') else: ax2.plot(karr, np.imag(FJarr), lw=0.8, label='Imag') ax2.set_title(FJname) ax2.set_xlabel('k /$km^{-1}$') ax2.grid() ax2.legend(loc='lower left') # 数值积分,不乘系数dk Parr = np.cumsum(FJarr) if isinstance(RorI, int) and RorI==2: ax3.plot(karr, np.real(Parr), lw=0.8, label='Real') ax3.plot(karr, np.imag(Parr), lw=0.8, label='Imag') else: if RorI: ax3.plot(karr, np.real(Parr), lw=0.8, label='Real') else: ax3.plot(karr, np.imag(Parr), lw=0.8, label='Imag') ax3.set_title(f'$\sum_k$ {FJname}') ax3.set_xlabel("k /$km^{-1}$") ax3.grid() ax3.legend(loc='lower left') return fig, (ax1, ax2, ax3)
[文档] def plot_statsdata_ptam(statsdata1:np.ndarray, statsdata2:np.ndarray, statsdata_ptam:np.ndarray, dist:float, srctype:str, ptype:str, RorI:Union[bool,int]=True, fig:Union[Figure,None]=None, axs:Union[Axes,None]=None): r''' Based on data read by the :func:`read_statsfile_ptam <pygrt.utils.read_statsfile_ptam>` function, simply calculate and plot the cumulative integral as well as the peak/trough positions used by PTAM. .. note:: Not every source type corresponds to every order and every integration type, see :ref:`grn_types` for details. :param statsdata1: integral process data during DWM or FIM :param statsdata2: integral process data during PTAM :param statsdata_ptam: peak/trough positions and amplitudes from PTAM :param dist: epicentral distance (km) :param srctype: abbreviation of source type, including EX, VF, HF, DD, DS, SS :param ptype: integration type (0, 1, 2, 3) :param RorI: whether to plot real or imaginary part, default is real part, pass 2 to plot both :param fig: user-defined matplotlib.Figure object, default is None :param axs: user-defined matplotlib.Axes object array (three elements), default is None :return: - **fig** - matplotlib.Figure object - **(ax1, ax2, ax3)** - matplotlib.Axes object array ''' ptype = str(ptype) karr1 = statsdata1['k'] dk1 = karr1[1] - karr1[0] Fname, Farr1, FJname, FJarr1 = _get_stats_Fname(statsdata1, karr1, dist, srctype, ptype) karr2 = statsdata2['k'] dk2 = karr2[1] - karr2[0] Fname, Farr2, FJname, FJarr2 = _get_stats_Fname(statsdata2, karr2, dist, srctype, ptype) is_evendk = np.allclose(np.diff(karr1), dk1, atol=1e-10) and np.allclose(np.diff(karr2), dk2, atol=1e-10) # 是否为均匀dk if not is_evendk: raise ValueError("Sorry, this function only supports even-distributed k.") # 将两个过程的结果拼起来 Farr = np.hstack((Farr1, Farr2)) karr = np.hstack((karr1, karr2)) FJarr = np.hstack((FJarr1, FJarr2)) if fig is None or axs is None: fig, axs = plt.subplots(3, 1, figsize=(8, 9), gridspec_kw=dict(hspace=0.7)) # axs长度必须为三个 if len(axs) != 3: raise ValueError("axs should have 3 elements.") ax1, ax2, ax3 = axs if isinstance(RorI, int) and RorI==2: ax1.plot(karr, np.real(Farr), lw=0.8, label='Real') ax1.plot(karr, np.imag(Farr), lw=0.8, label='Imag') else: if RorI: ax1.plot(karr, np.real(Farr), lw=0.8, label='Real') else: ax1.plot(karr, np.imag(Farr), lw=0.8, label='Imag') ax1.set_xlabel('k /$km^{-1}$') ax1.set_title(Fname) ax1.grid() ax1.legend(loc='lower left') if isinstance(RorI, int) and RorI==2: ax2.plot(karr, np.real(FJarr), lw=0.8, label='Real') ax2.plot(karr, np.imag(FJarr), lw=0.8, label='Imag') else: if RorI: ax2.plot(karr, np.real(FJarr), lw=0.8, label='Real') else: ax2.plot(karr, np.imag(FJarr), lw=0.8, label='Imag') ax2.set_title(FJname) ax2.set_xlabel('k /$km^{-1}$') ax2.grid() ax2.legend(loc='lower left') # 波峰波谷位置,用红十字标记 ptKarr = statsdata_ptam[f'sum_{srctype}_{ptype}_k'] ptFJarr = statsdata_ptam[f'sum_{srctype}_{ptype}'] # 数值积分,不乘系数dk Parr1 = np.cumsum(FJarr1) Parr2 = np.cumsum(FJarr2) Parr = np.hstack([Parr1, Parr2*dk2/dk1+Parr1[-1]]) if isinstance(RorI, int) and RorI==2: ax3.plot(karr, np.real(Parr), lw=0.8, label='Real') ax3.plot(ptKarr, np.real(ptFJarr), 'r+', markersize=6) ax3.plot(karr, np.imag(Parr), lw=0.8, label='Imag') ax3.plot(ptKarr, np.imag(ptFJarr), 'r+', markersize=6) else: if RorI: ax3.plot(karr, np.real(Parr), lw=0.8, label='Real') ax3.plot(ptKarr, np.real(ptFJarr), 'r+', markersize=6) else: ax3.plot(karr, np.imag(Parr), lw=0.8, label='Imag') ax3.plot(ptKarr, np.imag(ptFJarr), 'r+', markersize=6) ax3.set_title(f'$\sum_k$ {FJname}') ax3.set_xlabel("k /$km^{-1}$") ax3.grid() ax3.legend(loc='lower left') return fig, (ax1, ax2, ax3)
[文档] def solve_lamb1(nu:float, ts:np.ndarray, azimuth:float): r""" solve the first-kind Lamb's problem using the generalized closed-form solution, see: 张海明, 冯禧 著. 2024. 地震学中的 Lamb 问题(下). 科学出版社 :param nu: possion ratio in (0, 0.5) :param ts: dimensionless time :math:`\bar{t}` ,:math:`\bar{t}=\dfrac{t}{r/\beta}` :param azimuth: azimuth in degree :return: Normalized solution with shape (nt, 3, 3). To get the physical solution, divide by :math:`\pi^2 \mu r` """ # 检查数据 if np.any(ts < 0.0): raise ValueError("ts should be nonnegative.") if azimuth < 0.0 or azimuth > 360.0: raise ValueError("azimuth should be in [0, 360].") ts = np.array(ts).astype(NPCT_REAL_TYPE) # 定义结果数组 nt = len(ts) u = np.zeros((nt, 3, 3), dtype=NPCT_REAL_TYPE) C_grt_solve_lamb1(nu, npct.as_ctypes(ts), nt, azimuth, npct.as_ctypes(u.ravel())) return u