es_sfgtools.workflows.workflow_handler module
- class es_sfgtools.workflows.workflow_handler.WorkflowHandler(directory: Path | str = None)
Bases:
WorkflowABCHandles data operations including searching, adding, downloading, and processing.
This class extends WorkflowABC to provide comprehensive workflow management capabilities including data ingestion, processing pipelines, and analysis tools for seafloor geodesy workflows.
- ingest_add_local_data(directory_path: Path) None
Scans a directory for data files and adds them to the catalog.
- Parameters:
directory_path (Path) – The path to the directory to scan.
- ingest_catalog_archive_data() None
Updates the data catalog with the s3 uri’s for data hosted in Earthscope’s remote archive for the current network, station, and campaign.
Notes
This method does not download any data files. It only updates the catalog with remote file paths. See ingest_download_archive_data to download files.
- ingest_download_archive_data(file_types: List[AssetType] | List[str] | None = [DOWNLOAD_TYPES.SONARDYNE, DOWNLOAD_TYPES.NOVATEL, DOWNLOAD_TYPES.NOVATEL000, DOWNLOAD_TYPES.NOVATEL770, DOWNLOAD_TYPES.DFPO00, DOWNLOAD_TYPES.CTD, DOWNLOAD_TYPES.SEABIRD]) None
Downloads data files from the Earthscope archive based on the current catalog entries.
Notes
This method requires that the catalog has been populated with remote file paths using ingest_catalog_archive_data.
- midprocess_get_processor(site_metadata: Site | str | None = None, override_metadata_require: bool = False) IntermediateDataProcessor
Returns an instance of the IntermediateDataProcessor for the current station.
- Parameters:
site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.
override_metadata_require (bool, optional) – If True, bypasses the requirement for loaded site metadata, by default False.
- Returns:
An instance of IntermediateDataProcessor.
- Return type:
- Raises:
ValueError – If site metadata is not loaded and override_metadata_require is False.
- midprocess_get_sitemeta(site_metadata: Site | str | None = None) Site
Loads and returns the site metadata for the current station. Sets the current_station_metadata attribute.
If site_metadata is None, attempts to load from data_handler’s current_station_metadata.
If site_metadata is a string or Path, loads the site metadata from the file.
If site_metadata is already a Site instance, uses it directly.
- midprocess_parse_surveys(site_metadata: Site | str | None = None, override: bool = False, write_intermediate: bool = False, survey_id: str | None = None) IntermediateDataProcessor
Parses survey data for the current station.
- Parameters:
site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.
override (bool, optional) – If True, re-parses existing data, by default False.
write_intermediate (bool, optional) – If True, writes intermediate files to disk, by default False.
survey_id (Optional[str], optional) – Optional survey identifier to process. If None, processes all surveys, by default None.
- Raises:
ValueError – If site metadata is not loaded.
- midprocess_prep_garpos(site_metadata: Site | str | None = None, survey_id: str | None = None, custom_filters: dict | None = None, override: bool = False, override_survey_parsing: bool = False, write_intermediate: bool = False) None
Prepares data for GARPOS processing.
- Parameters:
site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.
survey_id (Optional[str], optional) – Optional survey identifier to process. If None, processes all surveys, by default None.
custom_filters (dict, optional) – Custom filter settings for shot data preparation, by default None.
override (bool, optional) – If True, re-prepares existing data, by default False.
write_intermediate (bool, optional) – If True, writes intermediate files, by default False.
- Raises:
ValueError – If site metadata is not loaded.
- midprocess_upload_s3(overwrite: bool = False, override_metadata_require: bool = False) None
Uploads intermediate processed data to S3 for the current station. :param overwrite: If True, overwrites existing data on S3, by default False. :type overwrite: bool, optional :param override_metadata_require: If True, bypasses the requirement for loaded site metadata, by default False. :type override_metadata_require: bool, optional
- Raises:
ValueError – If site metadata is not loaded and
override_metadata_requireis False.
- modeling_get_garpos_handler() GarposHandler
Returns an instance of the GarposHandler for the current station.
- Returns:
An instance of GarposHandler.
- Return type:
- Raises:
ValueError – If site metadata is not loaded.
- modeling_plot_garpos_results(survey_id: str | None = None, run_id: str = 'Test', residuals_filter: float | None = 10, save_fig: bool = True, show_fig: bool = False) None
Plots the time series results for a given survey.
- Parameters:
survey_id (str, optional) – ID of the survey to plot results for, by default None.
run_id (int or str, optional) – The run ID of the survey results to plot, by default 0.
res_filter (float, optional) – The residual filter value to filter outrageous values (m), by default 10.
save_fig (bool, optional) – If True, save the figure, by default True.
show_fig (bool, optional) – If True, display the figure, by default False.
- modeling_run_garpos(survey_id: str | None = None, run_id: str = 'Test', iterations: int = 1, override: bool = False, custom_settings: dict | None = None) None
Runs GARPOS processing for the current station.
- Parameters:
survey_id (Optional[str], optional) – Optional survey identifier to process. If None, processes all surveys, by default None.
run_id (str) – Identifier for the GARPOS run.
iterations (int, optional) – Number of GARPOS iterations to perform, by default 1.
site_metadata (Optional[Union[Site, str]], optional) – Optional site metadata or path to metadata file. If not provided, it will be loaded if available.
override (bool, optional) – If True, re-runs GARPOS even if results exist, by default False.
custom_settings (Optional[dict], optional) – Custom settings to override GARPOS defaults, by default None.
- Raises:
ValueError – If site metadata is not loaded.
- preprocess_get_pipeline_sv3(primary_config: SV3PipelineConfig | PrideCLIConfig | NovatelConfig | RinexConfig | DFOP00Config | PositionUpdateConfig | dict | None = None, secondary_config: SV3PipelineConfig | PrideCLIConfig | NovatelConfig | RinexConfig | DFOP00Config | PositionUpdateConfig | dict | None = None) SV3Pipeline
Creates and configures an SV3 processing pipeline.
- Parameters:
primary_config (Optional[Union[SV3PipelineConfig, PrideCLIConfig, NovatelConfig, RinexConfig, DFOP00Config, PositionUpdateConfig, dict]], optional) – Optional primary configuration for the pipeline.
secondary_config (Optional[Union[SV3PipelineConfig, PrideCLIConfig, NovatelConfig, RinexConfig, DFOP00Config, PositionUpdateConfig, dict]], optional) – Optional secondary configuration for the pipeline.
- Returns:
Configured SV3Pipeline instance.
- Return type:
- Raises:
AssertionError – If current network, station, or campaign is not set.
ValueError – If configuration validation fails.
See also
es_sfgtools.pipelines.sv3_pipeline.SV3PipelineThe pipeline class used for processing.
- preprocess_run_pipeline_sv3(job: Literal['all', 'process_novatel', 'build_rinex', 'run_pride', 'process_kinematic', 'process_dfop00', 'refine_shotdata', 'process_svp'] = 'all', primary_config: SV3PipelineConfig | PrideCLIConfig | NovatelConfig | RinexConfig | DFOP00Config | PositionUpdateConfig | dict | None = None, secondary_config: SV3PipelineConfig | PrideCLIConfig | NovatelConfig | RinexConfig | DFOP00Config | PositionUpdateConfig | dict | None = None) None
Runs the SV3 processing pipeline with optional configuration overrides.
This method creates and configures an
SV3Pipelineinstance using thedata_handlerto access the directory structure and catalog.- Parameters:
job (Literal["all", "process_novatel", "build_rinex", "run_pride", "process_kinematic", "process_dfop00", "refine_shotdata", "process_svp"], optional) – The specific job to run within the pipeline, by default “all”.
primary_config (Optional[Union[SV3PipelineConfig, dict]], optional) – Primary configuration to override defaults.
secondary_config (Optional[Union[SV3PipelineConfig, dict]], optional) – Secondary configuration to override primary and defaults.
- Raises:
AssertionError – If job is not in valid pipeline jobs.
ValueError – If configuration validation fails.
See also
preprocess_get_pipeline_sv3Method that creates the pipeline instance.
es_sfgtools.pipelines.sv3_pipeline.SV3PipelineThe pipeline class used.
es_sfgtools.data_mgmt.data_handler.DataHandlerData management dependency.
Examples
# Run the sv3 pipeline with custom Novatel processing configuration >>> workflow = WorkflowHandler(“/path/to/data”) >>> workflow.change_working_station(“network”, “station”, “campaign”) >>> workflow.preprocess_run_pipeline_sv3( … job=”process_novatel”, … primary_config={“novatel_config”: {“n_processes”: 8}} … )
- set_network_station_campaign(network_id: str, station_id: str, campaign_id: str)
Sets the current network, station, and campaign.
Delegates to DataHandler which handles both its own setup and parent context switching. Then syncs WorkflowHandler-specific state.
- Parameters:
network_id (str) – The ID of the network to set.
station_id (str) – The ID of the station to set.
campaign_id (str) – The ID of the campaign to set.