Source code for multiply_data_access.locally_wrapped_data_access

"""
Description
===========

This module contains functionality for wrapping file systems or meta info providers with locally available file systems
or meta info providers. This is done with the objective to facilitate the handling of data that needs to be downloaded
(or is in another way hard to access) by putting it in a local data store.
by
"""

from abc import abstractmethod
import logging
from multiply_core.util import FileRef
from multiply_data_access.data_access import DataSetMetaInfo, FileSystem, MetaInfoProvider
from multiply_data_access.local_file_system import LocalFileSystem
from multiply_data_access.json_meta_info_provider import JsonMetaInfoProvider
from typing import List, Sequence

__author__ = 'Tonio Fincke (Brockmann Consult GmbH)'


[docs]class LocallyWrappedFileSystem(FileSystem): def __init__(self, parameters: dict): if 'path' not in parameters.keys(): raise ValueError('Missing parameter \'path\'') if 'pattern' not in parameters.keys(): raise ValueError('Missing parameter \'pattern\'') self._local_file_system = LocalFileSystem(parameters['path'], parameters['pattern']) self._init_wrapped_file_system(parameters)
[docs] @abstractmethod def _init_wrapped_file_system(self, parameters: dict) -> None: """Initializes the file system wrapped by the LocallyWrappingFileSystem. To be called instead of __init__"""
def get(self, data_set_meta_info: DataSetMetaInfo) -> Sequence[FileRef]: file_refs = self._local_file_system.get(data_set_meta_info) if len(file_refs) > 0: return file_refs file_refs_from_wrapped = self._get_from_wrapped(data_set_meta_info) if len(file_refs_from_wrapped) == 0: return [] self._local_file_system.put(file_refs_from_wrapped[0].url, data_set_meta_info) self._notify_copied_to_local(data_set_meta_info) return self._local_file_system.get(data_set_meta_info)
[docs] @abstractmethod def _get_from_wrapped(self, data_set_meta_info: DataSetMetaInfo) -> Sequence[FileRef]: """Retrieves the file ref from the wrapped file system."""
[docs] @abstractmethod def _notify_copied_to_local(self, data_set_meta_info: DataSetMetaInfo) -> None: """Called when the data set has been copied to the local file system."""
def get_parameters_as_dict(self) -> dict: local_parameters = self._local_file_system.get_parameters_as_dict() wrapped_parameters = self._get_wrapped_parameters_as_dict() local_parameters.update(wrapped_parameters) return local_parameters
[docs] @abstractmethod def _get_wrapped_parameters_as_dict(self) -> dict: """ :return: The parameters of this wrapped file system as dict """
def can_put(self) -> bool: return False def put(self, from_url: str, data_set_meta_info: DataSetMetaInfo) -> DataSetMetaInfo: raise UserWarning('Method not supported') def remove(self, data_set_meta_info: DataSetMetaInfo): raise UserWarning('Method not supported') def scan(self) -> Sequence[DataSetMetaInfo]: logging.info('Scanning local file system, not remote') return self._local_file_system.scan()
class LocallyWrappedMetaInfoProvider(MetaInfoProvider): def __init__(self, parameters: dict): if 'path_to_json_file' not in parameters.keys(): raise ValueError('Missing path to json file') if 'supported_data_types' in parameters.keys(): provided_data_types = parameters['supported_data_types'] else: provided_data_types = ','.join(self.get_provided_data_types()) self._json_meta_info_provider = JsonMetaInfoProvider(parameters['path_to_json_file'], provided_data_types) self._init_wrapped_meta_info_provider(parameters) @abstractmethod def _init_wrapped_meta_info_provider(self, parameters: dict) -> None: """Initializes the meta info provider wrapped by the LocallyWrappingMetaInfoProvider. To be called instead of __init__""" def query(self, query_string: str) -> List[DataSetMetaInfo]: local_data_meta_set_infos = self._json_meta_info_provider.query(query_string) wrapped_data_set_meta_infos = self._query_wrapped_meta_info_provider(query_string, local_data_meta_set_infos) to_be_appended = [] for wrapped_data_set_meta_info in wrapped_data_set_meta_infos: already_contained = False for local_data_meta_set_info in local_data_meta_set_infos: if wrapped_data_set_meta_info.equals(local_data_meta_set_info): already_contained = True break if not already_contained: to_be_appended.append(wrapped_data_set_meta_info) for data_set_meta_info in to_be_appended: local_data_meta_set_infos.append(data_set_meta_info) return local_data_meta_set_infos @abstractmethod def _query_wrapped_meta_info_provider(self, query_string: str, local_data_set_meta_infos: List[DataSetMetaInfo]) \ -> List[DataSetMetaInfo]: """Queries a wrapped file system.""" def _get_parameters_as_dict(self) -> dict: local_parameters = self._json_meta_info_provider._get_parameters_as_dict() del local_parameters['supported_data_types'] wrapped_parameters = self._get_wrapped_parameters_as_dict() local_parameters.update(wrapped_parameters) return local_parameters @abstractmethod def _get_wrapped_parameters_as_dict(self) -> dict: """ :return: The parameters of this wrapped meta info provider as dict """ def notify_got(self, data_set_meta_info: DataSetMetaInfo): self._json_meta_info_provider.update(data_set_meta_info) def can_update(self) -> bool: return True def update(self, data_set_meta_info: DataSetMetaInfo): logging.info('Updating local meta info provider, not remote') if self.provides_data_type(data_set_meta_info.data_type): self._json_meta_info_provider.update(data_set_meta_info) def remove(self, data_set_meta_info: DataSetMetaInfo): self._json_meta_info_provider.remove(data_set_meta_info) def get_all_data(self) -> Sequence[DataSetMetaInfo]: return self._json_meta_info_provider.get_all_data()