diff --git a/corva/__init__.py b/corva/__init__.py index e69de29b..5cc0bea6 100644 --- a/corva/__init__.py +++ b/corva/__init__.py @@ -0,0 +1,4 @@ +from .application import Corva # noqa: F401 +from .models.stream import StreamEvent # noqa: F401 +from .network.api import Api # noqa: F401 +from .state.redis_state import RedisState as Cache # noqa: F401 diff --git a/corva/app/base.py b/corva/app/base.py index 190ff89a..502a8906 100644 --- a/corva/app/base.py +++ b/corva/app/base.py @@ -1,27 +1,23 @@ from abc import ABC, abstractmethod from itertools import groupby -from logging import Logger, LoggerAdapter -from typing import List, Optional, Union +from typing import List, Optional -from corva import settings from corva.event import Event -from corva.logger import DEFAULT_LOGGER from corva.models.base import BaseContext from corva.network.api import Api +from corva.settings import CORVA_SETTINGS class BaseApp(ABC): def __init__( self, - app_key: str = settings.APP_KEY, - cache_url: str = settings.CACHE_URL, - api: Optional[Api] = None, - logger: Union[Logger, LoggerAdapter] = DEFAULT_LOGGER + app_key: str = CORVA_SETTINGS.APP_KEY, + cache_url: str = CORVA_SETTINGS.CACHE_URL, + api: Optional[Api] = None ): self.app_key = app_key self.cache_url = cache_url - self.api = api or Api() - self.logger = logger + self.api = api @property @abstractmethod @@ -42,7 +38,6 @@ def run(self, event: str) -> None: event = self.event_loader.load(event=event) events = self._group_event(event=event) except Exception: - self.logger.error('Could not prepare events for run.') raise for event in events: @@ -52,7 +47,6 @@ def _run(self, event: Event) -> None: try: context = self.get_context(event=event) except Exception: - self.logger.error('Could not get context.') raise try: @@ -60,7 +54,6 @@ def _run(self, event: Event) -> None: self.process(context=context) self.post_process(context=context) except Exception as exc: - self.logger.error('An error occurred in process pipeline.') self.on_fail(context=context, exception=exc) raise diff --git a/corva/app/scheduled.py b/corva/app/scheduled.py deleted file mode 100644 index 4dfc28d1..00000000 --- a/corva/app/scheduled.py +++ /dev/null @@ -1,35 +0,0 @@ -from corva.app.base import BaseApp -from corva.event import Event -from corva.models.scheduled import ScheduledContext, ScheduledEventData -from corva.state.redis_adapter import RedisAdapter -from corva.state.redis_state import RedisState -from corva.utils import GetStateKey - - -class ScheduledApp(BaseApp): - group_by_field = 'app_connection_id' - - @property - def event_loader(self): - return - - def get_context(self, event: Event) -> ScheduledContext: - return ScheduledContext( - event=event, - state=RedisState( - redis=RedisAdapter( - default_name=GetStateKey.from_event(event=event, app_key=self.app_key), - cache_url=self.cache_url, - logger=self.logger - ), - logger=self.logger - ) - ) - - def post_process(self, context: ScheduledContext) -> None: - for data in context.event: # type: ScheduledEventData - self.update_schedule_status(schedule=data.schedule, status='completed') - - def update_schedule_status(self, schedule: int, status: str) -> dict: - response = self.api.post(path=f'scheduler/{schedule}/{status}') - return response diff --git a/corva/app/stream.py b/corva/app/stream.py deleted file mode 100644 index bdf3de33..00000000 --- a/corva/app/stream.py +++ /dev/null @@ -1,126 +0,0 @@ -from itertools import chain -from typing import Optional, List - -from corva.app.base import BaseApp -from corva.event import Event -from corva.models.stream import StreamContext, StreamEventData -from corva.state.redis_adapter import RedisAdapter -from corva.state.redis_state import RedisState -from corva.utils import GetStateKey - - -class StreamApp(BaseApp): - DEFAULT_LAST_PROCESSED_VALUE = -1 - - group_by_field = 'app_connection_id' - - def __init__(self, filter_by_timestamp: bool = False, filter_by_depth: bool = False, *args, **kwargs): - super().__init__(*args, **kwargs) - self.filter_by_timestamp = filter_by_timestamp - self.filter_by_depth = filter_by_depth - - @property - def event_loader(self): - return - - def get_context(self, event: Event) -> StreamContext: - return StreamContext( - event=event, - state=RedisState( - redis=RedisAdapter( - default_name=GetStateKey.from_event(event=event, app_key=self.app_key), - cache_url=self.cache_url, - logger=self.logger - ), - logger=self.logger - ) - ) - - def pre_process(self, context: StreamContext) -> None: - last_processed_timestamp = ( - int(context.state.load(key='last_processed_timestamp') or self.DEFAULT_LAST_PROCESSED_VALUE) - if self.filter_by_timestamp - else self.DEFAULT_LAST_PROCESSED_VALUE - ) - - last_processed_depth = ( - float(context.state.load(key='last_processed_depth') or self.DEFAULT_LAST_PROCESSED_VALUE) - if self.filter_by_depth - else self.DEFAULT_LAST_PROCESSED_VALUE - ) - - event = self._filter_event( - event=context.event, - last_processed_timestamp=last_processed_timestamp, - last_processed_depth=last_processed_depth - ) - - context.event = event - - def post_process(self, context: StreamContext) -> None: - all_records: List[StreamEventData.Record] = list(chain(*[subdata.records for subdata in context.event])) - - last_processed_timestamp = max( - [ - record.timestamp - for record in all_records - if record.timestamp is not None - ], - default=None - ) - last_processed_depth = max( - [ - record.measured_depth - for record in all_records - if record.measured_depth is not None - ], - default=None - ) - - mapping = {} - if last_processed_timestamp is not None: - mapping['last_processed_timestamp'] = last_processed_timestamp - if last_processed_depth is not None: - mapping['last_processed_depth'] = last_processed_depth - - context.state.store(mapping=mapping) - - @classmethod - def _filter_event( - cls, - event: Event, - last_processed_timestamp: Optional[int], - last_processed_depth: Optional[float] - ) -> Event: - data = [] - for subdata in event: # type: StreamEventData - data.append( - cls._filter_event_data( - data=subdata, - last_processed_timestamp=last_processed_timestamp, - last_processed_depth=last_processed_depth - ) - ) - - return Event(data) - - @staticmethod - def _filter_event_data( - data: StreamEventData, - last_processed_timestamp: Optional[int] = None, - last_processed_depth: Optional[float] = None - ) -> StreamEventData: - records = data.records - - if data.is_completed: - records = records[:-1] # remove "completed" record - - new_records = [] - for record in records: - if last_processed_timestamp is not None and record.timestamp <= last_processed_timestamp: - continue - if last_processed_depth is not None and record.measured_depth <= last_processed_depth: - continue - new_records.append(record) - - return data.copy(update={'records': new_records}, deep=True) diff --git a/corva/application.py b/corva/application.py index 4b077326..dd4d449b 100644 --- a/corva/application.py +++ b/corva/application.py @@ -1,2 +1,83 @@ +from typing import Any, Callable, List, Optional + +from corva.models.scheduled import ScheduledContext, ScheduledEvent +from corva.models.stream import StreamContext, StreamEvent +from corva.network.api import Api +from corva.runners.scheduled import scheduled_runner +from corva.runners.stream import stream_runner +from corva.settings import CorvaSettings, CORVA_SETTINGS + + class Corva: - pass + def __init__( + self, + api_url: Optional[str] = None, + data_api_url: Optional[str] = None, + cache_url: Optional[str] = None, + api_key: Optional[str] = None, + app_key: Optional[str] = None, + api_timeout: Optional[int] = None, + api_max_retries: Optional[int] = None, + cache_kwargs: Optional[dict] = None + ): + self.cache_kwargs = cache_kwargs or {} + + self.settings = CorvaSettings( + API_ROOT_URL=api_url or CORVA_SETTINGS.API_ROOT_URL, + DATA_API_ROOT_URL=data_api_url or CORVA_SETTINGS.DATA_API_ROOT_URL, + API_KEY=api_key or CORVA_SETTINGS.API_KEY, + CACHE_URL=cache_url or CORVA_SETTINGS.CACHE_URL, + APP_KEY=app_key or CORVA_SETTINGS.APP_KEY + ) + + self.api = Api( + api_url=self.settings.API_ROOT_URL, + data_api_url=self.settings.DATA_API_ROOT_URL, + api_key=self.settings.API_KEY, + app_name=self.settings.APP_NAME, + timeout=api_timeout, + max_retries=api_max_retries + ) + + def stream( + self, + fn: Callable, + event: str, + *, + filter_by_timestamp: bool = False, + filter_by_depth: bool = False + ) -> List[Any]: + events = StreamEvent.from_raw_event(event=event, app_key=self.settings.APP_KEY) + + results = [] + + for event in events: + ctx = StreamContext( + event=event, + settings=self.settings, + api=self.api, + cache_kwargs=self.cache_kwargs, + filter_by_timestamp=filter_by_timestamp, + filter_by_depth=filter_by_depth + ) + + results.append(stream_runner(fn=fn, context=ctx)) + + return results + + def scheduled(self, fn: Callable, event: str): + events = ScheduledEvent.from_raw_event(event=event) + + results = [] + + for event in events: + ctx = ScheduledContext( + event=event, + settings=self.settings, + api=self.api, + cache_kwargs=self.cache_kwargs + ) + + results.append(scheduled_runner(fn=fn, context=ctx)) + + return results diff --git a/corva/logger.py b/corva/logger.py deleted file mode 100644 index 83816473..00000000 --- a/corva/logger.py +++ /dev/null @@ -1,58 +0,0 @@ -from logging import LoggerAdapter, Formatter, getLogger -from logging.config import dictConfig -from time import gmtime - -from corva import settings - - -class UtcFormatter(Formatter): - converter = gmtime - - -dictConfig( - { - 'version': 1, - 'formatters': { - 'default': { - '()': UtcFormatter, - 'format': '%(asctime)s %(name)-5s %(levelname)-5s %(message)s' - } - }, - 'handlers': { - 'stream': { - 'class': 'logging.StreamHandler', - 'level': settings.LOG_LEVEL, - 'formatter': 'default', - 'stream': 'ext://sys.stdout' - } - }, - 'loggers': { - 'main': { - 'level': settings.LOG_LEVEL, - 'handlers': ['stream'], - 'propagate': False - } - } - } -) - - -class LogAdapter(LoggerAdapter): - extra_fields = [] - - def process(self, msg, kwargs): - message_parts = [ - f'[{field}:{self.extra[field]}]' - for field in self.extra_fields - if field in self.extra - ] - message_parts.append(str(msg)) - message = ' '.join(message_parts) - return message, kwargs - - -class AppLogger(LogAdapter): - extra_fields = ['asset_id'] - - -DEFAULT_LOGGER = getLogger('main') diff --git a/corva/models/base.py b/corva/models/base.py index 2a778eff..c1a8606c 100644 --- a/corva/models/base.py +++ b/corva/models/base.py @@ -1,47 +1,86 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional, TypeVar +from typing import Any, Generic, List, Optional, Type, TypeVar, Union -from pydantic import BaseModel, Extra +import pydantic +from pydantic.generics import GenericModel from corva.network.api import Api +from corva.settings import CorvaSettings +from corva.state.redis_adapter import RedisAdapter from corva.state.redis_state import RedisState class BaseEvent(ABC): @staticmethod @abstractmethod - def from_raw_event(event: str, **kwargs) -> BaseEvent: + def from_raw_event(event: str, **kwargs) -> Union[List[BaseEvent], BaseEvent]: pass -class BaseContext(BaseModel): - """Stores common data for running a Corva app.""" +class CorvaModelConfig: + allow_population_by_field_name = True + arbitrary_types_allowed = True + extra = pydantic.Extra.allow + validate_assignment = True + + +class CorvaBaseModel(pydantic.BaseModel): + Config = CorvaModelConfig + + +class CorvaGenericModel(GenericModel): + Config = CorvaModelConfig + - class Config: - arbitrary_types_allowed = True - extra = Extra.allow +BaseEventTV = TypeVar('BaseEventTV', bound=BaseEvent) +CorvaBaseModelTV = TypeVar('CorvaBaseModelTV', bound=CorvaBaseModel) - raw_event: str - app_key: str - event: Optional[BaseEvent] = None - api: Optional[Api] = None - state: Optional[RedisState] = None +class BaseContext(CorvaGenericModel, Generic[BaseEventTV, CorvaBaseModelTV]): + """Stores common data for running a Corva app.""" + + event: BaseEventTV + settings: CorvaSettings + api: Api + _cache: Optional[RedisState] = None + user_result: Any = None + # cache params + cache_kwargs: dict = {} + cache_data_cls: Optional[Type[CorvaBaseModelTV]] = None + + @property + def cache_key(self) -> str: + return ( + f'{self.settings.PROVIDER}/well/{self.event.asset_id}/stream/{self.event.app_stream_id}/' + f'{self.settings.APP_KEY}/{self.event.app_connection_id}' + ) + + @property + def cache(self) -> RedisState: + if self._cache is not None: + return self._cache -class BaseEventData(BaseModel): - class Config: - extra = Extra.allow - allow_population_by_field_name = True + redis_adapter = RedisAdapter( + name=self.cache_key, + cache_url=self.settings.CACHE_URL, + **self.cache_kwargs + ) + self._cache = RedisState(redis=redis_adapter) -BaseEventDataTV = TypeVar('BaseEventDataTV', bound=BaseEventData) + return self._cache + @property + def cache_data(self) -> CorvaBaseModelTV: + state_data_dict = self.cache.load_all() + return self.cache_data_cls(**state_data_dict) -class ListEvent(BaseEvent, List[BaseEventDataTV]): - """Base class for list events (events that consist of more than one event data).""" + def store_cache_data(self, cache_data: CorvaBaseModelTV) -> int: + if cache_data := cache_data.dict(exclude_defaults=True, exclude_none=True): + return self.cache.store(mapping=cache_data) - pass + return 0 diff --git a/corva/models/scheduled.py b/corva/models/scheduled.py index 562f3967..ceeb924f 100644 --- a/corva/models/scheduled.py +++ b/corva/models/scheduled.py @@ -6,15 +6,10 @@ import pydantic -from corva.models.base import BaseContext, BaseEventData, ListEvent -from corva.state.redis_state import RedisState +from corva.models.base import BaseContext, BaseEvent, CorvaBaseModel -class ScheduledContext(BaseContext): - state: RedisState - - -class ScheduledEventData(BaseEventData): +class ScheduledEventData(CorvaBaseModel): type: Optional[str] = None collection: Optional[str] = None cron_string: str @@ -42,13 +37,17 @@ class ScheduledEventData(BaseEventData): day_shift_start: Optional[str] = None -class ScheduledEvent(ListEvent[ScheduledEventData]): +class ScheduledEvent(BaseEvent, ScheduledEventData): @staticmethod - def from_raw_event(event: str, **kwargs) -> ScheduledEvent: - parsed = pydantic.parse_raw_as(List[List[ScheduledEventData]], event) + def from_raw_event(event: str, **kwargs) -> List[ScheduledEvent]: + events = pydantic.parse_raw_as(List[List[ScheduledEvent]], event) + + # raw event from queue comes in from of 2d array of ScheduledEvent + # flatten parsed event into 1d array of ScheduledEvent, which is an expected return type + events = list(itertools.chain(*events)) + + return events - # raw event from queue comes in from of 2d array of datas - # flatten parsed event into 1d array of datas, which is expected by ScheduledEvent - parsed = list(itertools.chain(*parsed)) - return ScheduledEvent(parsed) +class ScheduledContext(BaseContext[ScheduledEvent, CorvaBaseModel]): + pass diff --git a/corva/models/stream.py b/corva/models/stream.py index 696b79cd..1e0953c4 100644 --- a/corva/models/stream.py +++ b/corva/models/stream.py @@ -1,24 +1,19 @@ from __future__ import annotations -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Type import pydantic -from corva.models.base import BaseContext, BaseEventData, ListEvent -from corva.state.redis_state import RedisState +from corva.models.base import BaseContext, BaseEvent, CorvaBaseModel -class StreamContext(BaseContext): - state: RedisState - - -class RecordData(BaseEventData): +class RecordData(CorvaBaseModel): hole_depth: Optional[float] = None weight_on_bit: Optional[int] = None state: Optional[str] = None -class Record(BaseEventData): +class Record(CorvaBaseModel): timestamp: Optional[int] = None asset_id: int company_id: int @@ -28,24 +23,44 @@ class Record(BaseEventData): data: RecordData -class AppMetadata(BaseEventData): +class AppMetadata(CorvaBaseModel): app_connection_id: int app_version: Optional[int] = None -class StreamEventMetadata(BaseEventData): +class StreamEventMetadata(CorvaBaseModel): app_stream_id: int + source_type: Optional[str] = None apps: Dict[str, AppMetadata] -class StreamEventData(BaseEventData): +class StreamEventData(CorvaBaseModel): app_key: Optional[str] = None records: List[Record] metadata: StreamEventMetadata + asset_id: int = None - @property - def asset_id(self) -> int: - return self.records[0].asset_id + @pydantic.validator('asset_id', pre=True, always=True) + def set_asset_id(cls, v, values): + """dynamically sets value for asset_id + + asset_id could've been defined as property like below. + + @property + def asset_id(self) -> Optional[int]: + return self.records[0].asset_id if self.records else None + + The issue with the above method is: + after filtering, we may end up with empty records. Which leads to asset_id becoming None. + Using this validator we are able to dynamically set and store value of asset_id, + no matter what happens to records. + """ + + records = values['records'] # type: List[Record] + if records: + return records[0].asset_id + + raise ValueError('Can\'t determine asset_id as records are empty (which should not happen).') @property def app_connection_id(self) -> int: @@ -63,14 +78,52 @@ def is_completed(self) -> bool: return False -class StreamEvent(ListEvent[StreamEventData]): +class StreamEvent(BaseEvent, StreamEventData): + @staticmethod + def from_raw_event(event: str, **kwargs) -> List[StreamEvent]: + app_key = kwargs['app_key'] + + events = pydantic.parse_raw_as(List[StreamEvent], event) # type: List[StreamEvent] + + for event in events: + event.app_key = app_key + + return events + @staticmethod - def from_raw_event(event: str, **kwargs) -> StreamEvent: - app_key: str = kwargs['app_key'] + def filter( + event: StreamEvent, by_timestamp: bool, by_depth: bool, last_timestamp: int, last_depth: float + ) -> StreamEvent: + records = event.records + + if event.is_completed: + records = records[:-1] # remove "completed" record + + new_records = [] + for record in records: + if by_timestamp and record.timestamp <= last_timestamp: + continue + if by_depth and record.measured_depth <= last_depth: + continue + + new_records.append(record) + + return event.copy(update={'records': new_records}, deep=True) + + +class StreamStateData(CorvaBaseModel): + last_processed_timestamp: Optional[int] = None + last_processed_depth: Optional[float] = None + - parsed = pydantic.parse_raw_as(List[StreamEventData], event) # type: List[StreamEventData] +class StreamContext(BaseContext[StreamEvent, StreamStateData]): + cache_data_cls: Type[StreamStateData] = StreamStateData + filter_by_timestamp: bool = False + filter_by_depth: bool = False - for data in parsed: - data.app_key = app_key + @pydantic.root_validator(pre=True) + def check_one_active_filter_at_most(cls, values): + if values['filter_by_timestamp'] and values['filter_by_depth']: + raise ValueError('filter_by_timestamp and filter_by_depth can\'t be set to True together.') - return StreamEvent(parsed) + return values diff --git a/corva/models/task.py b/corva/models/task.py index ed6b3715..9564f3d9 100644 --- a/corva/models/task.py +++ b/corva/models/task.py @@ -6,7 +6,7 @@ import pydantic from pydantic.types import conint -from corva.models.base import BaseContext, BaseEventData, BaseEvent +from corva.models.base import BaseContext, BaseEvent, CorvaBaseModel class TaskStatus(Enum): @@ -37,12 +37,7 @@ class UpdateTaskData(pydantic.BaseModel): payload: dict = {} -class TaskContext(BaseContext): - task: TaskData - task_result: dict = {} - - -class TaskEventData(BaseEventData): +class TaskEventData(CorvaBaseModel): id: Optional[str] = None task_id: str version: conint(ge=2, le=2) # only utils API v2 supported @@ -52,3 +47,7 @@ class TaskEvent(BaseEvent, TaskEventData): @staticmethod def from_raw_event(event: str, **kwargs) -> TaskEvent: return pydantic.parse_raw_as(TaskEvent, event) + + +class TaskContext(BaseContext[TaskEvent, CorvaBaseModel]): + pass diff --git a/corva/network/api.py b/corva/network/api.py index 3a593b31..5f2190a3 100644 --- a/corva/network/api.py +++ b/corva/network/api.py @@ -6,23 +6,23 @@ from requests.adapters import HTTPAdapter from urllib3 import Retry -from corva import settings - class Api: ALLOWED_METHODS = {'GET', 'POST', 'PATCH', 'PUT', 'DELETE'} + DEFAULT_TIMEOUT = 600 + DEFAULT_MAX_RETRIES = 3 def __init__( self, - api_url: str = settings.API_ROOT_URL, - data_api_url: str = settings.DATA_API_ROOT_URL, - api_key: str = settings.API_KEY, - app_name: str = settings.APP_NAME, - timeout: int = 600, - max_retries: int = 3 + api_url: str, + data_api_url: str, + api_key: str, + app_name: str, + timeout: Optional[int] = None, + max_retries: Optional[int] = None ): - self.timeout = timeout - self.max_retries = max_retries + self.timeout = timeout or self.DEFAULT_TIMEOUT + self.max_retries = max_retries or self.DEFAULT_MAX_RETRIES self.api_url = api_url self.data_api_url = data_api_url self.api_key = api_key @@ -54,20 +54,25 @@ def _init_session(api_key: str, app_name: str, max_retries: int, allowed_methods return session - def get(self, path: str, **kwargs): - return self._request('GET', path, **kwargs) + @property + def get(self): + return self._request('GET') - def post(self, path: str, **kwargs): - return self._request('POST', path, **kwargs) + @property + def post(self): + return self._request('POST') - def patch(self, path: str, **kwargs): - return self._request('PATCH', path, **kwargs) + @property + def patch(self): + return self._request('PATCH') - def put(self, path: str, **kwargs): - return self._request('PUT', path, **kwargs) + @property + def put(self): + return self._request('PUT') - def delete(self, path: str, **kwargs): - return self._request('DELETE', path, **kwargs) + @property + def delete(self): + return self._request('DELETE') def _get_url(self, path: str): # search text like api/v1/data or api/v1/message_producer in path @@ -78,35 +83,36 @@ def _get_url(self, path: str): return os.path.join(base_url.strip('/'), path.strip('/')) - def _request( - self, - method: str, - path: str, - data: Optional[dict] = None, # request body - params: Optional[dict] = None, # url query string params - headers: Optional[dict] = None, # additional headers to include in request - max_retries: Optional[int] = None, # custom value for max number of retries - timeout: Optional[int] = None, # request timeout in seconds - ) -> Response: - - if method not in self.ALLOWED_METHODS: - raise ValueError(f'Invalid HTTP method {method}.') - - max_retries = max_retries or self.max_retries - timeout = timeout or self.timeout - - # not thread safe - self.session.adapters['https://'].max_retries.total = max_retries - - response = self.session.request( - method=method, - url=self._get_url(path=path), - params=params, - json=data, - headers=headers, - timeout=timeout - ) + def _request(self, method: str): + def _request_helper( + path: str, + *, + data: Optional[dict] = None, # request body + params: Optional[dict] = None, # url query string params + headers: Optional[dict] = None, # additional headers to include in request + max_retries: Optional[int] = None, # custom value for max number of retries + timeout: Optional[int] = None, # request timeout in seconds + ) -> Response: + if method not in self.ALLOWED_METHODS: + raise ValueError(f'Invalid HTTP method {method}.') + + max_retries = max_retries or self.max_retries + timeout = timeout or self.timeout + + # not thread safe + self.session.adapters['https://'].max_retries.total = max_retries + + response = self.session.request( + method=method, + url=self._get_url(path=path), + params=params, + json=data, + headers=headers, + timeout=timeout + ) + + response.raise_for_status() - response.raise_for_status() + return response - return response + return _request_helper diff --git a/tests/network/__init__.py b/corva/runners/__init__.py similarity index 100% rename from tests/network/__init__.py rename to corva/runners/__init__.py diff --git a/corva/runners/scheduled.py b/corva/runners/scheduled.py new file mode 100644 index 00000000..8d30cdc5 --- /dev/null +++ b/corva/runners/scheduled.py @@ -0,0 +1,11 @@ +from typing import Any, Callable + +from corva.models.scheduled import ScheduledContext + + +def scheduled_runner(fn: Callable, context: ScheduledContext) -> Any: + result = fn(context.event, context.api, context.cache) + + context.api.post(path=f'scheduler/{context.event.schedule}/completed') + + return result diff --git a/corva/runners/stream.py b/corva/runners/stream.py new file mode 100644 index 00000000..feb7e8a3 --- /dev/null +++ b/corva/runners/stream.py @@ -0,0 +1,46 @@ +from typing import Any, Callable + +from corva.models.stream import StreamContext, StreamEvent, StreamStateData + + +def stream_runner(fn: Callable, context: StreamContext) -> Any: + if (last_timestamp := context.cache_data.last_processed_timestamp) is None: + last_timestamp = -1 # filtering will leave all records, as no timestamp can be negative + if (last_depth := context.cache_data.last_processed_depth) is None: + last_depth = -1 # filtering will leave all records, as no depth can be negative + + context.event = StreamEvent.filter( + event=context.event, + by_timestamp=context.filter_by_timestamp, + by_depth=context.filter_by_depth, + last_timestamp=last_timestamp, + last_depth=last_depth + ) + + result = fn(context.event, context.api, context.cache) + + last_processed_timestamp = max( + [ + record.timestamp + for record in context.event.records + if record.timestamp is not None + ], + default=context.cache_data.last_processed_timestamp + ) + last_processed_depth = max( + [ + record.measured_depth + for record in context.event.records + if record.measured_depth is not None + ], + default=context.cache_data.last_processed_depth + ) + + context.store_cache_data( + StreamStateData( + last_processed_timestamp=last_processed_timestamp, + last_processed_depth=last_processed_depth + ) + ) + + return result diff --git a/corva/settings.py b/corva/settings.py index 55833288..98913528 100644 --- a/corva/settings.py +++ b/corva/settings.py @@ -1,15 +1,40 @@ from os import getenv -from typing import Final +from typing import Optional -API_ROOT_URL: Final[str] = getenv('API_ROOT_URL') -DATA_API_ROOT_URL: Final[str] = getenv('DATA_API_ROOT_URL') -APP_KEY: Final[str] = getenv('APP_KEY') -APP_NAME: Final[str] = getenv('APP_NAME') -API_KEY: Final[str] = getenv('API_KEY') +from pydantic import BaseSettings -# Logger -LOG_LEVEL: Final[str] = getenv('LOG_LEVEL', 'WARN') -LOG_ASSET_ID: Final[int] = int(getenv('LOG_ASSET_ID', -1)) -# Storage -CACHE_URL: Final[str] = getenv('CACHE_URL') +class CorvaSettings(BaseSettings): + # api + API_ROOT_URL: Optional[str] = None + DATA_API_ROOT_URL: Optional[str] = None + API_KEY: Optional[str] = None + + # cache + CACHE_URL: Optional[str] = None + + # logger + LOG_LEVEL: str = 'WARN' + + # misc + APP_KEY: Optional[str] = None # . + + @property + def APP_NAME(self) -> str: + if app_name := getenv('APP_NAME') is not None: + return app_name + + app_name_with_dashes = self.APP_KEY.split('.')[1] + app_name = app_name_with_dashes.replace('-', ' ').title() + + return app_name + + @property + def PROVIDER(self) -> str: + if provider := getenv('PROVIDER') is not None: + return provider + + return self.APP_KEY.split('.')[0] + + +CORVA_SETTINGS = CorvaSettings() diff --git a/corva/state/redis_adapter.py b/corva/state/redis_adapter.py index 8597e026..79a6fa1f 100644 --- a/corva/state/redis_adapter.py +++ b/corva/state/redis_adapter.py @@ -1,12 +1,9 @@ from datetime import timedelta -from logging import Logger, LoggerAdapter from typing import Optional, List, Dict, Union from redis import Redis, from_url, ConnectionError -from corva import settings -from corva.types import REDIS_STORED_VALUE_TYPE -from corva.logger import DEFAULT_LOGGER +REDIS_STORED_VALUE_TYPE = Union[bytes, str, int, float] class RedisAdapter(Redis): @@ -14,15 +11,13 @@ class RedisAdapter(Redis): def __init__( self, - default_name: str, - cache_url: str = settings.CACHE_URL, - logger: Union[Logger, LoggerAdapter] = DEFAULT_LOGGER, + name: str, + cache_url: str, **kwargs ): kwargs.setdefault('decode_responses', True) super().__init__(connection_pool=from_url(url=cache_url, **kwargs).connection_pool) - self.logger = logger - self.default_name = default_name + self.name = name try: self.ping() except ConnectionError as exc: @@ -30,48 +25,38 @@ def __init__( def hset( self, - name: Optional[str] = None, key: Optional[str] = None, value: Optional[REDIS_STORED_VALUE_TYPE] = None, mapping: Optional[Dict[str, REDIS_STORED_VALUE_TYPE]] = None, expiry: Union[int, timedelta, None] = DEFAULT_EXPIRY ) -> int: - name = name or self.default_name + n_set = super().hset(name=self.name, key=key, value=value, mapping=mapping) - n_set = super().hset(name=name, key=key, value=value, mapping=mapping) - - if expiry is None and self.pttl(name=name) > 0: - self.persist(name=name) + if expiry is None and self.pttl() > 0: + self.persist(name=self.name) if expiry is not None: - self.expire(name=name, time=expiry) + self.expire(name=self.name, time=expiry) return n_set - def hget(self, key: str, name: Optional[str] = None) -> Union[REDIS_STORED_VALUE_TYPE, None]: - name = name or self.default_name - return super().hget(name=name, key=key) + def hget(self, key: str) -> Union[REDIS_STORED_VALUE_TYPE, None]: + return super().hget(name=self.name, key=key) - def hgetall(self, name: Optional[str] = None) -> Dict[str, Union[REDIS_STORED_VALUE_TYPE, None]]: - name = name or self.default_name - return super().hgetall(name=name) + def hgetall(self) -> Dict[str, Union[REDIS_STORED_VALUE_TYPE]]: + return super().hgetall(name=self.name) - def hdel(self, keys: List[str], name: Optional[str] = None) -> int: - name = name or self.default_name - return super().hdel(name, *keys) + def hdel(self, keys: List[str]) -> int: + return super().hdel(self.name, *keys) - def delete(self, *names: List[str]) -> int: - names = names or [self.default_name] - return super().delete(*names) + def delete(self) -> int: + return super().delete(self.name) - def ttl(self, name: Optional[str] = None) -> int: - name = name or self.default_name - return super().ttl(name=name) + def ttl(self) -> int: + return super().ttl(name=self.name) - def pttl(self, name: Optional[str] = None) -> int: - name = name or self.default_name - return super().pttl(name=name) + def pttl(self) -> int: + return super().pttl(name=self.name) - def exists(self, *names: List[str]) -> int: - names = names or [self.default_name] - return super().exists(*names) + def exists(self) -> int: + return super().exists(self.name) diff --git a/corva/state/redis_state.py b/corva/state/redis_state.py index 5c3a66aa..32e76ec6 100644 --- a/corva/state/redis_state.py +++ b/corva/state/redis_state.py @@ -1,7 +1,3 @@ -from logging import Logger, LoggerAdapter -from typing import Union - -from corva.logger import DEFAULT_LOGGER from corva.state.redis_adapter import RedisAdapter @@ -12,30 +8,37 @@ class RedisState: This class provides and interface save, load and do other operation with data in redis cache. """ - def __init__(self, redis: RedisAdapter, logger: Union[Logger, LoggerAdapter] = DEFAULT_LOGGER): + def __init__(self, redis: RedisAdapter): self.redis = redis - self.logger = logger - def store(self, **kwargs): - return self.redis.hset(**kwargs) + @property + def store(self): + return self.redis.hset - def load(self, **kwargs): - return self.redis.hget(**kwargs) + @property + def load(self): + return self.redis.hget - def load_all(self, **kwargs): - return self.redis.hgetall(**kwargs) + @property + def load_all(self): + return self.redis.hgetall - def delete(self, **kwargs): - return self.redis.hdel(**kwargs) + @property + def delete(self): + return self.redis.hdel - def delete_all(self, *names): - return self.redis.delete(*names) + @property + def delete_all(self): + return self.redis.delete - def ttl(self, **kwargs): - return self.redis.ttl(**kwargs) + @property + def ttl(self): + return self.redis.ttl - def pttl(self, **kwargs): - return self.redis.pttl(**kwargs) + @property + def pttl(self): + return self.redis.pttl - def exists(self, *names): - return self.redis.exists(*names) + @property + def exists(self): + return self.redis.exists diff --git a/corva/types.py b/corva/types.py deleted file mode 100644 index 87c3c08b..00000000 --- a/corva/types.py +++ /dev/null @@ -1,3 +0,0 @@ -from typing import Union - -REDIS_STORED_VALUE_TYPE = Union[bytes, str, int, float] diff --git a/corva/utils.py b/corva/utils.py deleted file mode 100644 index 34401117..00000000 --- a/corva/utils.py +++ /dev/null @@ -1,22 +0,0 @@ -from corva.event import Event - - -class GetStateKey: - @classmethod - def _get_key(cls, asset_id: int, app_stream_id: int, app_key: str, app_connection_id: int): - provider = cls._get_provider(app_key=app_key) - state_key = f'{provider}/well/{asset_id}/stream/{app_stream_id}/{app_key}/{app_connection_id}' - return state_key - - @staticmethod - def _get_provider(app_key: str) -> str: - return app_key.split('.')[0] - - @classmethod - def from_event(cls, event: Event, app_key: str): - return cls._get_key( - asset_id=event[0].asset_id, - app_stream_id=event[0].app_stream_id, - app_key=app_key, - app_connection_id=event[0].app_connection_id - ) diff --git a/docs_src/tutorial_1_hello_world.py b/docs_src/tutorial_1_hello_world.py index 96ac93f2..c40edd6d 100644 --- a/docs_src/tutorial_1_hello_world.py +++ b/docs_src/tutorial_1_hello_world.py @@ -1,20 +1,16 @@ -from corva import Api, Corva, StreamEvent, State +from corva import Api, Cache, Corva, StreamEvent -app = Corva() # 1 initialize the app - -@app.stream # 2 add decorator with needed event type to your function -def stream_app(event: StreamEvent, api: Api, state: State): - # 3 above, add parameters with predefined types, that will be injected automatically - - """User's main logic function""" +# 1 define your function with essential parameters, that will be provided by Corva +def stream_app(event: StreamEvent, api: Api, cache: Cache): + """Main logic function""" pass +# 2 define function that will be run by AWS lambda def lambda_handler(event, context): - # 4 define function that will be run by AWS lambda - """AWS lambda handler""" - stream_app(event) # 5 pass only event as parameter to your function call + app = Corva() # 3 initialize the app + app.stream(stream_app, event) # 4 run stream app diff --git a/docs_src/tutorial_2_configuration.py b/docs_src/tutorial_2_configuration.py index f1b994df..dcd53256 100644 --- a/docs_src/tutorial_2_configuration.py +++ b/docs_src/tutorial_2_configuration.py @@ -1,21 +1,8 @@ -from corva import Api, Corva, StreamEvent, State +from corva import Api, Cache, Corva, StreamEvent -app = Corva( - # 1 api params - api_url='api.localhost', - api_data_url='api.data.localhost', - api_key='api_key', - api_app_name='api_app_name', - # 2 state params - state_url='redis://', - state_params={'param1': 'val1'} -) - - -@app.stream -def stream_app(event: StreamEvent, api: Api, state: State): - """User's main logic function""" +def stream_app(event: StreamEvent, api: Api, cache: Cache): + """Main logic function""" pass @@ -23,4 +10,5 @@ def stream_app(event: StreamEvent, api: Api, state: State): def lambda_handler(event, context): """AWS lambda handler""" - stream_app(event) + app = Corva() + app.stream(stream_app, event, filter_by_timestamp=True) diff --git a/tests/app/test_base.py b/tests/app/test_base.py index e9cb8f84..3acb0348 100644 --- a/tests/app/test_base.py +++ b/tests/app/test_base.py @@ -3,8 +3,9 @@ from corva.app.base import BaseApp from corva.event import Event -from corva.models.base import BaseEventData -from tests.conftest import ComparableException, APP_KEY, CACHE_URL +from corva.models.base import CorvaBaseModel +from corva.settings import CORVA_SETTINGS +from tests.conftest import ComparableException @pytest.fixture(scope='function') @@ -13,34 +14,28 @@ def base_app(mocker: MockerFixture): # so in order to initialize and test the class we patch __abstractmethods__ mocker.patch.object(BaseApp, '__abstractmethods__', set()) - return BaseApp(app_key=APP_KEY, cache_url=CACHE_URL) + return BaseApp(app_key=CORVA_SETTINGS.APP_KEY, cache_url=CORVA_SETTINGS.CACHE_URL, api=None) def test_run_exc_in_event_loader_load(mocker: MockerFixture, base_app): loader_mock = mocker.patch.object(BaseApp, 'event_loader') loader_mock.load.side_effect = Exception - logger_spy = mocker.spy(base_app, 'logger') with pytest.raises(Exception): base_app.run(event='') - logger_spy.error.assert_called_once_with('Could not prepare events for run.') - def test_run_exc_in__group_event(mocker: MockerFixture, base_app): mocker.patch.object(BaseApp, 'event_loader') mocker.patch.object(base_app, '_group_event', side_effect=Exception) - logger_spy = mocker.spy(base_app, 'logger') with pytest.raises(Exception): base_app.run(event='') - logger_spy.error.assert_called_once_with('Could not prepare events for run.') - def test_run_runs_for_each_event(mocker: MockerFixture, base_app): - event1 = Event([BaseEventData(a=1)]) - event2 = Event([BaseEventData(a=2)]) + event1 = Event([CorvaBaseModel(a=1)]) + event2 = Event([CorvaBaseModel(a=2)]) mocker.patch.object(BaseApp, 'event_loader') mocker.patch.object(base_app, '_group_event', return_value=[event1, event2]) @@ -54,9 +49,9 @@ def test_run_runs_for_each_event(mocker: MockerFixture, base_app): def test__group_event(mocker: MockerFixture, base_app): event = Event( - [BaseEventData(app_connection_id=1), - BaseEventData(app_connection_id=1), - BaseEventData(app_connection_id=2)] + [CorvaBaseModel(app_connection_id=1), + CorvaBaseModel(app_connection_id=1), + CorvaBaseModel(app_connection_id=2)] ) expected = [ [event[0], event[1]], @@ -72,26 +67,21 @@ def test__group_event(mocker: MockerFixture, base_app): def test__run_exc_in_get_context(mocker: MockerFixture, base_app): mocker.patch.object(base_app, 'get_context', side_effect=Exception) - logger_spy = mocker.spy(base_app, 'logger') with pytest.raises(Exception): base_app._run(event=Event([])) - logger_spy.error.assert_called_once_with('Could not get context.') - def test__run_exc_in_pre_process(mocker: MockerFixture, base_app): context = 'context' mocker.patch.object(base_app, 'get_context', return_value=context) mocker.patch.object(base_app, 'pre_process', side_effect=ComparableException) - logger_spy = mocker.spy(base_app, 'logger') on_fail_spy = mocker.spy(base_app, 'on_fail') with pytest.raises(ComparableException): base_app._run(event=Event([])) - logger_spy.error.assert_called_once_with('An error occurred in process pipeline.') on_fail_spy.assert_called_once_with(context=context, exception=ComparableException()) @@ -103,14 +93,12 @@ def test__run_exc_in_process(mocker: MockerFixture, base_app): mocker.patch.object(base_app, 'get_context', return_value=context) pre_spy = mocker.spy(base_app, 'pre_process') mocker.patch.object(base_app, 'process', side_effect=ComparableException) - logger_spy = mocker.spy(base_app, 'logger') on_fail_spy = mocker.spy(base_app, 'on_fail') with pytest.raises(ComparableException): base_app._run(event=Event([])) pre_spy.assert_called_once_with(context=context) - logger_spy.error.assert_called_once_with('An error occurred in process pipeline.') on_fail_spy.assert_called_once_with(context=context, exception=ComparableException()) @@ -123,7 +111,6 @@ def test__run_exc_in_post_process(mocker: MockerFixture, base_app): pre_spy = mocker.spy(base_app, 'pre_process') process_spy = mocker.spy(base_app, 'process') mocker.patch.object(base_app, 'post_process', side_effect=ComparableException) - logger_spy = mocker.spy(base_app, 'logger') on_fail_spy = mocker.spy(base_app, 'on_fail') with pytest.raises(ComparableException): @@ -131,7 +118,6 @@ def test__run_exc_in_post_process(mocker: MockerFixture, base_app): pre_spy.assert_called_once_with(context=context) process_spy.assert_called_once_with(context=context) - logger_spy.error.assert_called_once_with('An error occurred in process pipeline.') on_fail_spy.assert_called_once_with(context=context, exception=ComparableException()) diff --git a/tests/app/test_scheduled.py b/tests/app/test_scheduled.py deleted file mode 100644 index e467de9b..00000000 --- a/tests/app/test_scheduled.py +++ /dev/null @@ -1,73 +0,0 @@ -import pytest -from pytest_mock import MockerFixture - -from corva.app.scheduled import ScheduledApp -from corva.event import Event -from corva.models.scheduled import ScheduledContext, ScheduledEventData -from tests.conftest import APP_KEY, CACHE_URL - - -@pytest.fixture(scope='function') -def scheduled_app(api): - return ScheduledApp(api=api, app_key=APP_KEY, cache_url=CACHE_URL) - - -@pytest.fixture(scope='module') -def scheduled_event_data_factory(): - def _scheduled_event_data_factory(**kwargs): - default_kwargs = { - 'cron_string': str(), - 'environment': str(), - 'app': int(), - 'app_key': str(), - 'app_version': None, - 'app_connection_id': int(), - 'app_stream_id': int(), - 'source_type': str(), - 'company': int(), - 'provider': str(), - 'schedule': int(), - 'interval': int(), - 'schedule_start': int(), - 'schedule_end': int(), - 'asset_id': int(), - 'asset_name': str(), - 'asset_type': str(), - 'timezone': str(), - 'log_type': str() - } - default_kwargs.update(kwargs) - - return ScheduledEventData(**default_kwargs) - - return _scheduled_event_data_factory - - -@pytest.fixture(scope='function') -def scheduled_context_factory(scheduled_event_data_factory, redis): - def _scheduled_context_factory(**kwargs): - default_params = { - 'event': Event([scheduled_event_data_factory()]), - 'state': redis - } - default_params.update(kwargs) - - return ScheduledContext(**default_params) - - return _scheduled_context_factory - - -def test_group_by_field(): - assert ScheduledApp.group_by_field == 'app_connection_id' - - -def test_update_schedule_status(mocker: MockerFixture, scheduled_app): - schedule = 1 - status = 'status' - - mocker.patch.object(scheduled_app.api.session, 'request') - post_spy = mocker.patch.object(scheduled_app.api, 'post') - - scheduled_app.update_schedule_status(schedule=schedule, status=status) - - post_spy.assert_called_once_with(path=f'scheduler/{schedule}/{status}') diff --git a/tests/app/test_task.py b/tests/app/test_task.py index 8618d22d..2e213498 100644 --- a/tests/app/test_task.py +++ b/tests/app/test_task.py @@ -4,14 +4,23 @@ from corva.app.task import TaskApp from corva.event import Event from corva.models.task import TaskStatus, TaskData, TaskEventData, TaskContext, UpdateTaskData -from tests.conftest import APP_KEY, CACHE_URL +from corva.network.api import Api +from corva.settings import CORVA_SETTINGS TASK_ID = '1' @pytest.fixture(scope='function') -def task_app(api): - return TaskApp(api=api, app_key=APP_KEY, cache_url=CACHE_URL) +def task_app(): + return TaskApp( + api=Api( + api_url=CORVA_SETTINGS.API_ROOT_URL, + data_api_url=CORVA_SETTINGS.DATA_API_ROOT_URL, + api_key=CORVA_SETTINGS.API_KEY, + app_name=CORVA_SETTINGS.APP_NAME + ), + app_key=CORVA_SETTINGS.APP_KEY, cache_url=CORVA_SETTINGS.CACHE_URL + ) @pytest.fixture(scope='session') @@ -74,12 +83,12 @@ def test_get_task_data(mocker: MockerFixture, task_app, task_data_factory): 'request', return_value=mocker.Mock(**{'json.return_value': task_data.dict()}) ) - get_spy = mocker.spy(task_app.api, 'get') + type(task_app.api).get = mocker.PropertyMock(return_value=mocker.Mock(wraps=task_app.api.get)) result = task_app.get_task_data(task_id=TASK_ID) assert task_data == result - get_spy.assert_called_once_with(path=f'v2/tasks/{TASK_ID}') + task_app.api.get.assert_called_once_with(path=f'v2/tasks/{TASK_ID}') def test_update_task_data(mocker: MockerFixture, task_app): @@ -87,8 +96,8 @@ def test_update_task_data(mocker: MockerFixture, task_app): data = UpdateTaskData() mocker.patch.object(task_app.api.session, 'request') - put_spy = mocker.spy(task_app.api, 'put') + type(task_app.api).put = mocker.PropertyMock(return_value=mocker.Mock(wraps=task_app.api.put)) task_app.update_task_data(task_id=TASK_ID, status=status, data=data) - put_spy.assert_called_once_with(path=f'v2/tasks/{TASK_ID}/{status}', data=data.dict()) + task_app.api.put.assert_called_once_with(path=f'v2/tasks/{TASK_ID}/{status}', data=data.dict()) diff --git a/tests/conftest.py b/tests/conftest.py index 3a8fc494..3978f97e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,16 +1,8 @@ -from pathlib import Path +from functools import partial from unittest.mock import patch import pytest -from fakeredis import FakeRedis - -from corva.network.api import Api -from corva.state.redis_adapter import RedisAdapter -from corva.state.redis_state import RedisState - -APP_KEY = 'provider.app-name' -CACHE_URL = 'redis://localhost:6379' -DATA_PATH = Path('tests/test_data') +from fakeredis import FakeRedis, FakeServer @pytest.fixture(scope='function', autouse=True) @@ -25,26 +17,29 @@ def patch_redis_adapter(): redis_adapter_patcher = patch(f'{redis_adapter_path}.RedisAdapter.__bases__', (FakeRedis,)) + server = FakeServer() # use FakeServer to share cache between different instances of RedisState + with redis_adapter_patcher, \ - patch(f'{redis_adapter_path}.from_url', side_effect=FakeRedis.from_url): + patch(f'{redis_adapter_path}.from_url', side_effect=partial(FakeRedis.from_url, server=server)): # necessary to stop mock.patch from trying to call delattr when reversing the patch redis_adapter_patcher.is_local = True yield -@pytest.fixture(scope='function') -def redis_adapter(patch_redis_adapter): - return RedisAdapter(default_name='default_name', cache_url=CACHE_URL) - - -@pytest.fixture(scope='function') -def redis(redis_adapter): - return RedisState(redis=redis_adapter) - - -@pytest.fixture(scope='function') -def api(): - return Api(api_url='https://api.localhost.ai', data_api_url='https://data.localhost.ai') +@pytest.fixture(scope='function', autouse=True) +def patch_corva_settings(mocker): + """replaces empty values in global corva settings with proper test values""" + + settings_path = 'corva.settings.CORVA_SETTINGS' + + mocker.patch.multiple( + settings_path, + APP_KEY='provider.app-name', + CACHE_URL='redis://localhost:6379', + API_ROOT_URL='https://api.localhost.ai', + DATA_API_ROOT_URL='https://data.localhost.ai' + ) + yield class ComparableException(Exception): diff --git a/tests/state/__init__.py b/tests/docs_src/__init__.py similarity index 100% rename from tests/state/__init__.py rename to tests/docs_src/__init__.py diff --git a/tests/docs_src/test_tutorial_1.py b/tests/docs_src/test_tutorial_1.py new file mode 100644 index 00000000..639e4cb0 --- /dev/null +++ b/tests/docs_src/test_tutorial_1.py @@ -0,0 +1,11 @@ +from corva.settings import CORVA_SETTINGS +from docs_src import tutorial_1_hello_world + + +def test_tutorial(): + event = ( + '[{"records": [{"asset_id": 0, "company_id": 0, "version": 0, "collection": "", "data": {}}], ' + '"metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, "asset_id": 0}]' + ) % CORVA_SETTINGS.APP_KEY + + tutorial_1_hello_world.lambda_handler(event, None) diff --git a/tests/docs_src/test_tutorial_2.py b/tests/docs_src/test_tutorial_2.py new file mode 100644 index 00000000..26f114ae --- /dev/null +++ b/tests/docs_src/test_tutorial_2.py @@ -0,0 +1,12 @@ +from corva.settings import CORVA_SETTINGS +from docs_src import tutorial_2_configuration + + +def test_tutorial(): + event = ( + '[{"records": [{"timestamp": 0, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}], "metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, ' + '"asset_id": 0}]' + ) % CORVA_SETTINGS.APP_KEY + + tutorial_2_configuration.lambda_handler(event, None) diff --git a/tests/loader/test_scheduled.py b/tests/loader/test_scheduled.py deleted file mode 100644 index cca64597..00000000 --- a/tests/loader/test_scheduled.py +++ /dev/null @@ -1,18 +0,0 @@ -import pytest - -from corva.models.scheduled import ScheduledEvent -from tests.conftest import DATA_PATH - - -@pytest.fixture(scope='module') -def scheduled_event_str() -> str: - with open(DATA_PATH / 'scheduled_event.json') as scheduled_event: - return scheduled_event.read() - - -def test_load(scheduled_event_str): - """test that sample scheduled event loaded without exceptions""" - - event = ScheduledEvent.from_raw_event(scheduled_event_str) - - assert len(event) == 3 diff --git a/tests/loader/test_stream.py b/tests/loader/test_stream.py deleted file mode 100644 index 52dd3abb..00000000 --- a/tests/loader/test_stream.py +++ /dev/null @@ -1,18 +0,0 @@ -import pytest - -from corva.models.stream import StreamEvent -from tests.conftest import DATA_PATH - - -@pytest.fixture(scope='module') -def stream_event_str() -> str: - with open(DATA_PATH / 'stream_event.json') as stream_event: - return stream_event.read() - - -def test_load_from_file(stream_event_str): - """Tests that stream event is loaded from file without exceptions.""" - - event = StreamEvent.from_raw_event(event=stream_event_str, app_key='corva.wits-depth-summary') - - assert len(event) == 1 diff --git a/tests/state/test_redis_adapter.py b/tests/state/test_redis_adapter.py deleted file mode 100644 index 7dfd43cb..00000000 --- a/tests/state/test_redis_adapter.py +++ /dev/null @@ -1,118 +0,0 @@ -from datetime import datetime, timedelta - -import pytest -from fakeredis import FakeServer -from freezegun import freeze_time -from redis import ConnectionError - -from corva.state.redis_adapter import RedisAdapter - -NAME = 'NAME' -KEY = 'key' -VAL = 'val' -MAPPING = {'key1': 'val1', 'key2': 'val2'} - - -def test_connect(redis_adapter): - assert redis_adapter.ping() - - -def test_init_connect_exc(patch_redis_adapter): - server = FakeServer() - server.connected = False - - fake_cache_url = 'redis://random:123' - - with pytest.raises(ConnectionError) as exc: - RedisAdapter(default_name='name', cache_url=fake_cache_url, server=server) - assert str(exc.value) == f'Could not connect to Redis with URL: {fake_cache_url}' - - -@pytest.mark.parametrize('name', (None, NAME)) -def test_hset_and_hget(redis_adapter, name): - assert redis_adapter.hset(name=name, key=KEY, value=VAL) == 1 - assert redis_adapter.hget(name=name, key=KEY) == VAL - - -@pytest.mark.parametrize('name', (None, NAME)) -def test_hset_mapping_and_hgetall(redis_adapter, name): - assert redis_adapter.hset(name=NAME, mapping=MAPPING) == len(MAPPING) - assert redis_adapter.hgetall(name=NAME) == MAPPING - - -@pytest.mark.parametrize('name', (None, NAME)) -def test_hdel_and_exists(redis_adapter, name): - def exists(): - if name is None: - return redis_adapter.exists() - return redis_adapter.exists(name) - - assert redis_adapter.hset(name=name, key=KEY, value=VAL) == 1 - assert exists() - assert redis_adapter.hdel(keys=[KEY], name=name) == 1 - assert not exists() - - -@pytest.mark.parametrize('name', (None, NAME)) -def test_delete_and_exists(redis_adapter, name): - def exists(): - if name is None: - return redis_adapter.exists() - return redis_adapter.exists(name) - - def delete(): - if name is None: - return redis_adapter.delete() - else: - return redis_adapter.delete(name) - - assert redis_adapter.hset(name=name, key=KEY, value=VAL) == 1 - assert exists() - assert delete() - assert not exists() - - -@pytest.mark.parametrize('name', (None, NAME)) -def test_ttl(redis_adapter, name): - with freeze_time('2020'): - assert redis_adapter.hset(name=name, key=KEY, value=VAL) == 1 - assert redis_adapter.ttl(name=name) == redis_adapter.DEFAULT_EXPIRY.total_seconds() - - -@pytest.mark.parametrize('name', (None, NAME)) -def test_pttl(redis_adapter, name): - with freeze_time('2020'): - assert redis_adapter.hset(name=name, key=KEY, value=VAL) == 1 - assert redis_adapter.pttl(name=name) == redis_adapter.DEFAULT_EXPIRY.total_seconds() * 1000 - - -def test_hset_default_expiry(redis_adapter): - with freeze_time('2020'): - redis_adapter.hset(key=KEY, value=VAL) - assert redis_adapter.ttl() == RedisAdapter.DEFAULT_EXPIRY.total_seconds() - - -def test_hset_expiry_override(redis_adapter): - with freeze_time('2020'): - for expiry in [10, 5, 20]: - redis_adapter.hset(key=KEY, value=VAL, expiry=expiry) - assert redis_adapter.ttl() == expiry - - -def test_hset_expiry_disable(redis_adapter): - with freeze_time('2020'): - redis_adapter.hset(key=KEY, value=VAL, expiry=5) - assert redis_adapter.ttl() == 5 - - redis_adapter.hset(key=KEY, value=VAL, expiry=None) - assert redis_adapter.ttl() == -1 - - -def test_hset_expiry(redis_adapter): - with freeze_time('2020') as frozen_time: - now = datetime.utcnow() - redis_adapter.hset(key=KEY, value=VAL, expiry=5) - frozen_time.move_to(now + timedelta(seconds=5)) - assert redis_adapter.exists() - frozen_time.move_to(now + timedelta(seconds=5, microseconds=1)) - assert not redis_adapter.exists() diff --git a/tests/state/test_redis_state.py b/tests/state/test_redis_state.py deleted file mode 100644 index f7887e4b..00000000 --- a/tests/state/test_redis_state.py +++ /dev/null @@ -1,33 +0,0 @@ -from unittest.mock import patch - -import pytest - -KWARGS = {'key1': 'val1'} -NAMES = ['1', '2'] - - -@pytest.mark.parametrize('call_func_name,mock_func_name', ( - ('store', 'hset'), - ('load', 'hget'), - ('load_all', 'hgetall'), - ('delete', 'hdel'), - ('ttl', 'ttl'), - ('pttl', 'pttl'), -)) -def test_all(redis, call_func_name, mock_func_name): - with patch.object(redis.redis, mock_func_name) as mock_func: - call_func = getattr(redis, call_func_name) - call_func(**KWARGS) - mock_func.assert_called_once_with(**KWARGS) - - -def test_delete_all(redis): - with patch.object(redis.redis, 'delete') as mock_func: - redis.delete_all(*NAMES) - mock_func.assert_called_once_with(*NAMES) - - -def test_exists(redis): - with patch.object(redis.redis, 'exists') as mock_func: - redis.exists(*NAMES) - mock_func.assert_called_once_with(*NAMES) diff --git a/tests/network/test_api.py b/tests/test_api.py similarity index 65% rename from tests/network/test_api.py rename to tests/test_api.py index 11ea9776..d6757d59 100644 --- a/tests/network/test_api.py +++ b/tests/test_api.py @@ -1,5 +1,18 @@ import pytest +from corva.network.api import Api +from corva.settings import CORVA_SETTINGS + + +@pytest.fixture(scope='function') +def api(): + return Api( + api_url=CORVA_SETTINGS.API_ROOT_URL, + data_api_url=CORVA_SETTINGS.DATA_API_ROOT_URL, + api_key=CORVA_SETTINGS.API_KEY, + app_name=CORVA_SETTINGS + ) + def test_default_headers(api): assert not {'Authorization', 'X-Corva-App'} - set(api.session.headers) @@ -22,5 +35,5 @@ def test_get_url(api): def test_request_invalid_method(api): method = 'random' with pytest.raises(ValueError) as exc: - api._request(method=method, path='random') + api._request(method=method)(path='random') assert str(exc.value) == f'Invalid HTTP method {method}.' diff --git a/tests/test_data/scheduled_event.json b/tests/test_data/scheduled_event.json deleted file mode 100644 index dbb7783b..00000000 --- a/tests/test_data/scheduled_event.json +++ /dev/null @@ -1,116 +0,0 @@ -[ - [ - { - "type": "data_app", - "stream": "drilling-operations", - "follows": "corva.activity-group", - "category": "scheduling_app", - "drilling": { - "category": "" - }, - "batch_size": 10, - "completion": { - "category": "" - }, - "message_broker": "kafka", - "message_format": 2, - "scheduler_type": 2, - "whitelisted_app_connection_settings": { - "edit": [], - "read": [] - }, - "collection": "operations", - "cron_string": "*/5 * * * *", - "environment": "qa", - "app": 231, - "app_key": "corva.drilling-operations", - "app_version": null, - "app_connection": 269616, - "app_stream": 11792, - "source_type": "drilling", - "log_type": "time", - "company": 81, - "provider": "corva", - "api_url": "https://api.example.com", - "api_key": "SOME-API-KEY", - "schedule": 237252160, - "interval": 300, - "schedule_start": 1575970800000, - "schedule_end": 1575971100000, - "asset_id": 39293110, - "asset_name": "zauto_951_WITSML_0", - "asset_type": "Well", - "timezone": "America/Chicago", - "partition_number": 326, - "job": 7016 - }, - { - "type": "data_app", - "stream": "drilling-operations", - "follows": "corva.activity-group", - "category": "scheduling_app", - "drilling": { - "category": "" - }, - "batch_size": 10, - "completion": { - "category": "" - }, - "message_broker": "kafka", - "message_format": 2, - "scheduler_type": 2, - "whitelisted_app_connection_settings": { - "edit": [], - "read": [] - }, - "collection": "operations", - "cron_string": "*/5 * * * *", - "environment": "qa", - "app": 231, - "app_key": "corva.drilling-operations", - "app_version": null, - "app_connection": 269616, - "app_stream": 11792, - "source_type": "drilling", - "log_type": "time", - "company": 81, - "provider": "corva", - "api_url": "https://api.example.com", - "api_key": "SOME-API-KEY", - "schedule": 237252160, - "interval": 300, - "schedule_start": 1575971100000, - "schedule_end": 1575971400000, - "asset_id": 39293110, - "asset_name": "zauto_951_WITSML_0", - "asset_type": "Well", - "timezone": "America/Chicago", - "partition_number": 326, - "job": 7016 - } - ], - [ - { - "environment": "production", - "company": 1, - "provider": "my-company", - "asset_id": 2581235, - "asset_name": "My Well", - "asset_type": "Well", - "timezone": "America/Chicago", - "day_shift_start": "06:00", - "schedule": 402294, - "interval": 900, - "schedule_start": 1586678000000, - "schedule_end": 1586678900000, - "cron_string": "*/15 * * * *", - "app": 441, - "app_key": "my-company.my-drilling-app", - "app_version": "3", - "app_stream": 418264, - "app_connection": 1475510, - "source_type": "drilling", - "log_type": "time" - } - ] -] \ No newline at end of file diff --git a/tests/test_data/stream_event.json b/tests/test_data/stream_event.json deleted file mode 100644 index f556b2f6..00000000 --- a/tests/test_data/stream_event.json +++ /dev/null @@ -1,53 +0,0 @@ -[ - { - "metadata": { - "apps": { - "corva.wits-depth-summary": { - "app_connection_id": 1 - }, - "other.oil-price-app": { - "app_connection_id": 2 - } - }, - "app_stream_id": 294712 - }, - "records": [ - { - "asset_id": 1, - "timestamp": 1546300800, - "company_id": 24, - "version": 1, - "data": { - "hole_depth": 99.4, - "weight_on_bit": 1, - "state": "Some unnecessary drilling that's excluded" - }, - "collection": "collection" - }, - { - "asset_id": 1, - "timestamp": 1546300801, - "company_id": 24, - "version": 1, - "data": { - "hole_depth": 99.4, - "weight_on_bit": 1, - "state": "Rotary Drilling" - }, - "collection": "collection" - }, - { - "asset_id": 1, - "measured_depth": 1.0, - "company_id": 24, - "version": 1, - "data": { - "hole_depth": 99.4, - "weight_on_bit": 1, - "state": "Rotary Drilling" - }, - "collection": "collection" - } - ] - } -] \ No newline at end of file diff --git a/tests/test_redis.py b/tests/test_redis.py new file mode 100644 index 00000000..b8c99cad --- /dev/null +++ b/tests/test_redis.py @@ -0,0 +1,91 @@ +from datetime import datetime, timedelta + +import pytest +from fakeredis import FakeServer +from freezegun import freeze_time +from redis import ConnectionError + +from corva.settings import CORVA_SETTINGS +from corva.state.redis_adapter import RedisAdapter +from corva.state.redis_state import RedisState + + +@pytest.fixture(scope='function') +def redis(): + redis_adapter = RedisAdapter(name='name', cache_url=CORVA_SETTINGS.CACHE_URL) + return RedisState(redis=redis_adapter) + + +def test_init_connect_exc(): + server = FakeServer() + server.connected = False + + fake_cache_url = 'redis://random:123' + + with pytest.raises(ConnectionError) as exc: + RedisAdapter(name='name', cache_url=fake_cache_url, server=server) + assert str(exc.value) == f'Could not connect to Redis with URL: {fake_cache_url}' + + +def test_store_and_load(redis): + assert redis.store(key='key', value='val') == 1 + assert redis.load(key='key') == 'val' + + +def test_store_mapping_and_load_all(redis): + mapping = {'key1': 'val1', 'key2': 'val2'} + + assert redis.store(mapping=mapping) == len(mapping) + assert redis.load_all() == mapping + + +def test_delete_and_exists(redis): + assert redis.store(key='key', value='val') == 1 + assert redis.exists() + assert redis.delete(keys=['key']) == 1 + assert not redis.exists() + + +def test_delete_all_and_exists(redis): + assert redis.store(key='key', value='val') == 1 + assert redis.exists() + assert redis.delete_all() + assert not redis.exists() + + +def test_ttl(redis): + with freeze_time('2020'): + assert redis.store(key='key', value='val') == 1 + assert redis.ttl() > 0 + + +def test_pttl(redis): + with freeze_time('2020'): + assert redis.store(key='key', value='val') == 1 + assert redis.pttl() > 0 + + +def test_store_expiry_override(redis): + with freeze_time('2020'): + for expiry in [10, 5, 20]: + redis.store(key='key', value='val', expiry=expiry) + assert redis.ttl() == expiry + + +def test_store_expiry_disable(redis): + with freeze_time('2020'): + redis.store(key='key', value='val', expiry=5) + assert redis.ttl() == 5 + + redis.store(key='key', value='val', expiry=None) + assert redis.ttl() == -1 + + +def test_store_expiry(redis): + with freeze_time('2020') as frozen_time: + now = datetime.utcnow() + redis.store(key='key', value='val', expiry=5) + frozen_time.move_to(now + timedelta(seconds=5)) + assert redis.exists() + frozen_time.move_to(now + timedelta(seconds=5, microseconds=1)) + assert not redis.exists() diff --git a/tests/test_scheduled_app.py b/tests/test_scheduled_app.py new file mode 100644 index 00000000..29ed8901 --- /dev/null +++ b/tests/test_scheduled_app.py @@ -0,0 +1,24 @@ +from unittest.mock import Mock, MagicMock, PropertyMock + +from corva.application import Corva + + +def scheduled_app(event, api, state): + api.session.request = MagicMock() + type(api).post = PropertyMock(return_value=Mock(wraps=api.post)) + return api + + +def test_set_completed_status(mocker): + event = ( + '[[{"cron_string": "", "environment": "", "app": 0, "app_key": "", "app_connection_id": 0, "app_stream_id": 0, ' + '"source_type": "", "company": 0, "provider": "", "schedule": 0, "interval": 0, ' + '"schedule_start": "1970-01-01T00:00:00", "schedule_end": "1970-01-01T00:00:00", "asset_id": 0, ' + '"asset_name": "", "asset_type": "", "timezone": "", "log_type": ""}]]' + ) + + app = Corva() + + results = app.scheduled(scheduled_app, event) + + results[0].post.assert_called_once_with(path='scheduler/0/completed') diff --git a/tests/test_stream_app.py b/tests/test_stream_app.py new file mode 100644 index 00000000..6b924d0a --- /dev/null +++ b/tests/test_stream_app.py @@ -0,0 +1,130 @@ +import pytest + +from corva.application import Corva +from corva.settings import CORVA_SETTINGS + + +def stream_app(event, api, cache): + return event + + +@pytest.mark.parametrize( + 'collection, expected', + [ + ('wits.completed', 0), + ('random', 1) + ] +) +def test_is_completed(collection, expected): + event = ( + '[{"records": [{"asset_id": 0, "company_id": 0, "version": 0, "collection": "%s", "data": {}}],' + ' "metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, "asset_id": 0}]' + ) % (collection, CORVA_SETTINGS.APP_KEY) + + app = Corva() + + results = app.stream(stream_app, event) + + assert len(results[0].records) == expected + + +def test_asset_id_persists_after_no_records_left_after_filtering(): + event = ( + '[{"records": [{"asset_id": 123, "company_id": 0, "version": 0, "collection": "wits.completed", ' + '"data": {}}], "metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, ' + '"asset_id": 123}]' + ) % CORVA_SETTINGS.APP_KEY + + app = Corva() + + results = app.stream(stream_app, event) + + assert len(results[0].records) == 0 + assert results[0].asset_id == 123 + + +@pytest.mark.parametrize( + 'filter_by,record_attr', + [ + ('filter_by_timestamp', 'timestamp'), + ('filter_by_depth', 'measured_depth') + ] +) +def test_filter_by(filter_by, record_attr): + event = ( + '[{"records": [{"%s": -2, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}, {"%s": -1, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}, {"%s": 0, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}], "metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, ' + '"asset_id": 0}]' + ) % (record_attr, record_attr, record_attr, CORVA_SETTINGS.APP_KEY) + + app = Corva() + + results = app.stream(stream_app, event, **{filter_by: True}) + + assert len(results[0].records) == 1 + assert getattr(results[0].records[0], record_attr) == 0 + + +@pytest.mark.parametrize( + 'filter_by,record_attr', + [ + ('filter_by_timestamp', 'timestamp'), + ('filter_by_depth', 'measured_depth') + ] +) +def test_filter_by_value_saved_for_next_run(filter_by, record_attr): + # first invocation + event = ( + '[{"records": [{"%s": 0, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}, {"%s": 1, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}, {"%s": 2, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}], "metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, ' + '"asset_id": 0}]' + ) % (record_attr, record_attr, record_attr, CORVA_SETTINGS.APP_KEY) + + app = Corva() + + results = app.stream(stream_app, event, **{filter_by: True}) + + assert len(results[0].records) == 3 + + # second invocation + next_event = ( + '[{"records": [{"%s": 0, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}, {"%s": 1, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}, {"%s": 2, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}, {"%s": 3, "asset_id": 0, "company_id": 0, "version": 0, "collection": "", ' + '"data": {}}], "metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, ' + '"asset_id": 0}]' + ) % (record_attr, record_attr, record_attr, record_attr, app.settings.APP_KEY) + + next_results = app.stream(stream_app, next_event, **{filter_by: True}) + + assert len(next_results[0].records) == 1 + assert getattr(next_results[0].records[0], record_attr) == 3 + + +def test_empty_records_error(): + event = ( + '[{"records": [], "metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, ' + '"asset_id": 0}]' + ) % CORVA_SETTINGS.APP_KEY + + app = Corva() + + with pytest.raises(ValueError): + app.stream(stream_app, event) + + +def test_only_one_filter_allowed_at_a_time(): + event = ( + '[{"records": [{"asset_id": 0, "company_id": 0, "version": 0, "collection": "", "data": {}}], ' + '"metadata": {"app_stream_id": 0, "apps": {"%s": {"app_connection_id": 0}}}, "asset_id": 0}]' + ) % CORVA_SETTINGS.APP_KEY + + app = Corva() + + with pytest.raises(ValueError): + app.stream(stream_app, event, filter_by_timestamp=True, filter_by_depth=True) diff --git a/tests/test_utils.py b/tests/test_utils.py deleted file mode 100644 index ea55f77f..00000000 --- a/tests/test_utils.py +++ /dev/null @@ -1,31 +0,0 @@ -from corva.event import Event -from corva.models.base import BaseEventData -from corva.utils import GetStateKey - -PROVIDER = 'provider' -APP_KEY = f'{PROVIDER}.app-key' -ASSET_ID = 1 -APP_STREAM_ID = 2 -APP_CONNECTION_ID = 3 -STATE_KEY = f'{PROVIDER}/well/{ASSET_ID}/stream/{APP_STREAM_ID}/{APP_KEY}/{APP_CONNECTION_ID}' - - -def test_GetStateKey__get_provider(): - assert GetStateKey._get_provider(app_key=APP_KEY) == PROVIDER - - -def test_GetStateKey__get_key(): - state_key = GetStateKey._get_key( - asset_id=ASSET_ID, - app_stream_id=APP_STREAM_ID, - app_key=APP_KEY, - app_connection_id=APP_CONNECTION_ID - ) - assert state_key == STATE_KEY - - -def test_GetStateKey_from_event(): - event = Event( - [BaseEventData(asset_id=ASSET_ID, app_stream_id=APP_STREAM_ID, app_connection_id=APP_CONNECTION_ID)] - ) - assert GetStateKey.from_event(event=event, app_key=APP_KEY) == STATE_KEY