es_sfgtools.workflows.workflow_handler module

class es_sfgtools.workflows.workflow_handler.WorkflowHandler(directory: Path | str = None)

Bases: WorkflowABC

Handles data operations including searching, adding, downloading, and processing.

This class extends WorkflowABC to provide comprehensive workflow management capabilities including data ingestion, processing pipelines, and analysis tools for seafloor geodesy workflows.

ingest_add_local_data(directory_path: Path) None

Scans a directory for data files and adds them to the catalog.

Parameters:

directory_path (Path) – The path to the directory to scan.

ingest_catalog_archive_data() None

Updates the data catalog with the s3 uri’s for data hosted in Earthscope’s remote archive for the current network, station, and campaign.

Notes

This method does not download any data files. It only updates the catalog with remote file paths. See ingest_download_archive_data to download files.

ingest_download_archive_data(file_types: List[AssetType] | List[str] | None = [DOWNLOAD_TYPES.SONARDYNE, DOWNLOAD_TYPES.NOVATEL, DOWNLOAD_TYPES.NOVATEL000, DOWNLOAD_TYPES.NOVATEL770, DOWNLOAD_TYPES.DFPO00, DOWNLOAD_TYPES.CTD, DOWNLOAD_TYPES.SEABIRD]) None

Downloads data files from the Earthscope archive based on the current catalog entries.

Notes

This method requires that the catalog has been populated with remote file paths using ingest_catalog_archive_data.

midprocess_get_processor(site_metadata: Site | str | None = None, override_metadata_require: bool = False) IntermediateDataProcessor

Returns an instance of the IntermediateDataProcessor for the current station.

Parameters:
  • site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.

  • override_metadata_require (bool, optional) – If True, bypasses the requirement for loaded site metadata, by default False.

Returns:

An instance of IntermediateDataProcessor.

Return type:

IntermediateDataProcessor

Raises:

ValueError – If site metadata is not loaded and override_metadata_require is False.

midprocess_get_sitemeta(site_metadata: Site | str | None = None) Site

Loads and returns the site metadata for the current station. Sets the current_station_metadata attribute.

  1. If site_metadata is None, attempts to load from data_handler’s current_station_metadata.

  2. If site_metadata is a string or Path, loads the site metadata from the file.

  3. If site_metadata is already a Site instance, uses it directly.

Parameters:

site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.

Returns:

The site metadata.

Return type:

Site

Raises:

ValueError – If site metadata cannot be loaded or is not provided.

midprocess_parse_surveys(site_metadata: Site | str | None = None, override: bool = False, write_intermediate: bool = False, survey_id: str | None = None) IntermediateDataProcessor

Parses survey data for the current station.

Parameters:
  • site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.

  • override (bool, optional) – If True, re-parses existing data, by default False.

  • write_intermediate (bool, optional) – If True, writes intermediate files to disk, by default False.

  • survey_id (Optional[str], optional) – Optional survey identifier to process. If None, processes all surveys, by default None.

Raises:

ValueError – If site metadata is not loaded.

midprocess_prep_garpos(site_metadata: Site | str | None = None, survey_id: str | None = None, custom_filters: dict | None = None, override: bool = False, override_survey_parsing: bool = False, write_intermediate: bool = False) None

Prepares data for GARPOS processing.

Parameters:
  • site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.

  • survey_id (Optional[str], optional) – Optional survey identifier to process. If None, processes all surveys, by default None.

  • custom_filters (dict, optional) – Custom filter settings for shot data preparation, by default None.

  • override (bool, optional) – If True, re-prepares existing data, by default False.

  • write_intermediate (bool, optional) – If True, writes intermediate files, by default False.

Raises:

ValueError – If site metadata is not loaded.

midprocess_upload_s3(overwrite: bool = False, override_metadata_require: bool = False) None

Uploads intermediate processed data to S3 for the current station. :param overwrite: If True, overwrites existing data on S3, by default False. :type overwrite: bool, optional :param override_metadata_require: If True, bypasses the requirement for loaded site metadata, by default False. :type override_metadata_require: bool, optional

Raises:

ValueError – If site metadata is not loaded and override_metadata_require is False.

modeling_get_garpos_handler() GarposHandler

Returns an instance of the GarposHandler for the current station.

Returns:

An instance of GarposHandler.

Return type:

GarposHandler

Raises:

ValueError – If site metadata is not loaded.

modeling_plot_garpos_results(survey_id: str | None = None, run_id: str = 'Test', residuals_filter: float | None = 10, save_fig: bool = True, show_fig: bool = False) None

Plots the time series results for a given survey.

Parameters:
  • survey_id (str, optional) – ID of the survey to plot results for, by default None.

  • run_id (int or str, optional) – The run ID of the survey results to plot, by default 0.

  • res_filter (float, optional) – The residual filter value to filter outrageous values (m), by default 10.

  • save_fig (bool, optional) – If True, save the figure, by default True.

  • show_fig (bool, optional) – If True, display the figure, by default False.

modeling_run_garpos(survey_id: str | None = None, run_id: str = 'Test', iterations: int = 1, override: bool = False, custom_settings: dict | None = None) None

Runs GARPOS processing for the current station.

Parameters:
  • survey_id (Optional[str], optional) – Optional survey identifier to process. If None, processes all surveys, by default None.

  • run_id (str) – Identifier for the GARPOS run.

  • iterations (int, optional) – Number of GARPOS iterations to perform, by default 1.

  • site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.

  • override (bool, optional) – If True, re-runs GARPOS even if results exist, by default False.

  • custom_settings (Optional[dict], optional) – Custom settings to override GARPOS defaults, by default None.

Raises:

ValueError – If site metadata is not loaded.

preprocess_get_pipeline_sv3(primary_config: SV3PipelineConfig | PrideCLIConfig | NovatelConfig | RinexConfig | DFOP00Config | PositionUpdateConfig | dict | None = None, secondary_config: SV3PipelineConfig | PrideCLIConfig | NovatelConfig | RinexConfig | DFOP00Config | PositionUpdateConfig | dict | None = None) SV3Pipeline

Creates and configures an SV3 processing pipeline.

Parameters:
Returns:

Configured SV3Pipeline instance.

Return type:

SV3Pipeline

Raises:
  • AssertionError – If current network, station, or campaign is not set.

  • ValueError – If configuration validation fails.

See also

es_sfgtools.pipelines.sv3_pipeline.SV3Pipeline

The pipeline class used for processing.

preprocess_run_pipeline_sv3(job: Literal['all', 'process_novatel', 'build_rinex', 'run_pride', 'process_kinematic', 'process_dfop00', 'refine_shotdata', 'process_svp'] = 'all', primary_config: SV3PipelineConfig | PrideCLIConfig | NovatelConfig | RinexConfig | DFOP00Config | PositionUpdateConfig | dict | None = None, secondary_config: SV3PipelineConfig | PrideCLIConfig | NovatelConfig | RinexConfig | DFOP00Config | PositionUpdateConfig | dict | None = None) None

Runs the SV3 processing pipeline with optional configuration overrides.

This method creates and configures an SV3Pipeline instance using the data_handler to access the directory structure and catalog.

Parameters:
  • job (Literal["all", "process_novatel", "build_rinex", "run_pride", "process_kinematic", "process_dfop00", "refine_shotdata", "process_svp"], optional) – The specific job to run within the pipeline, by default “all”.

  • primary_config (Optional[Union[SV3PipelineConfig, dict]], optional) – Primary configuration to override defaults.

  • secondary_config (Optional[Union[SV3PipelineConfig, dict]], optional) – Secondary configuration to override primary and defaults.

Raises:
  • AssertionError – If job is not in valid pipeline jobs.

  • ValueError – If configuration validation fails.

See also

preprocess_get_pipeline_sv3

Method that creates the pipeline instance.

es_sfgtools.pipelines.sv3_pipeline.SV3Pipeline

The pipeline class used.

es_sfgtools.data_mgmt.data_handler.DataHandler

Data management dependency.

Examples

# Run the sv3 pipeline with custom Novatel processing configuration >>> workflow = WorkflowHandler(“/path/to/data”) >>> workflow.change_working_station(“network”, “station”, “campaign”) >>> workflow.preprocess_run_pipeline_sv3( … job=”process_novatel”, … primary_config={“novatel_config”: {“n_processes”: 8}} … )

set_network_station_campaign(network_id: str, station_id: str, campaign_id: str)

Sets the current network, station, and campaign.

Delegates to DataHandler which handles both its own setup and parent context switching. Then syncs WorkflowHandler-specific state.

Parameters:
  • network_id (str) – The ID of the network to set.

  • station_id (str) – The ID of the station to set.

  • campaign_id (str) – The ID of the campaign to set.