Create a time series information system ======================================= In the basics chapter, we've seen how to use a number of essential building blocks. Building on this knowledge, we want to address a more ambitious goal: how to write a coherent, well curated time series information system. We will start from the acquisition of time series data. There are some challenges there and we will address the most common. Once we have a complete "primary data" set, made of raw time series, we will start the process of building the **refined** series that we can use: * as input for models * as input for high-level analysis (e.g. fundamental economic analysis) * as a basis for complex dashboards and decision support systems At the end of this chapter, you will master all fundamental aspects of building a complete system. Pipelines : starting with injecting data ---------------------------------------- Data acquisition is the first step. Let's start with a simple example, in which we will get forecasts for renewable production in France. We will use a ready made open source client to get data from the `ENTSO-E `_ data repository. ENTSO-E itself exposes data for free in the context of the UE power markets. .. code-block:: python :linenos: import pandas as pd from entsoe.exceptions import NoMatchingDataError from entsoe import EntsoePandasClient from tshistory.api import timeseries def get_res_generation_forecast_entsoe(): fromdate = pd.to_datetime('today') - pd.Timedelta(1, 'd') todate = pd.to_datetime('today') + pd.Timedelta(7, 'd') try: df = EntsoePandasClient(api_key='apikey').query_wind_and_solar_forecast( 'FR', start=fromdate.tz_localize('Europe/Paris'), end=todate.tz_localize('Europe/Paris'), ) except NoMatchingDataError: return name_map = { 'Solar': 'power.prod.solar.fr.mwh.entsoe.fcst.h', 'Wind Onshore': 'power.prod.windonshore.fr.mwh.entsoe.fcst.h' } tsa = timeseries() for col, name in name_map.items(): tsa.update(name, df[col], author='entsoe-scraper') Such a scraping function, put into a Python script, could then be scheduled to be run at periodic moments of the day to continuously feed the time series repository; for instance using `cron` on Linux or `at` on Windows. This is a bare-bones solution, which unsurprinsinly is largely used in the industry. Some prefer to use sophisticated and refined task schedulers such as `Apache AirFlow `_, for the additional capacities it provides. Later in this document we will show how to use the Refinery integrated task manager to set up powerful data acquisition pipelines. Now, let's back to our scraping function and examine what it does in detail. .. code:: python fromdate = pd.to_datetime('today') - pd.Timedelta(1, 'd') todate = pd.to_datetime('today') + pd.Timedelta(7, 'd') Here we start by defining the time window for the data to be fetched. Since it is about forecasts, a window from yesterday to next week is reasonnable. .. code:: python try: df = EntsoePandasClient(api_key='apikey').query_wind_and_solar_forecast( 'FR', start=fromdate.tz_localize('Europe/Paris'), end=todate.tz_localize('Europe/Paris'), ) except NoMatchingDataError: return Next, we fetch the data using the ENTSO-E client dedicated method. We provide the `zone` and the time window. On error we exit immediately. .. code:: python name_map = { 'Solar': 'power.prod.solar.fr.mwh.entsoe.fcst.h', 'Wind Onshore': 'power.prod.windonshore.fr.mwh.entsoe.fcst.h' } The resulting data frame may have several columns. We *know* (by reading the documentation) that in our case (zone `FR`) we will have two columns. We construct a mapping from these column names to the series names we want to have. It is very important to have a good naming convention (in our case we indicate: nature of the flux, type of production, zone, provenance of the data, nature of the data - forecast is not observed nor a nomination - and finally the expected granularity). .. code:: python tsa = timeseries() for col, name in name_map.items(): tsa.update(name, df[col], 'entsoe-scraper') We create the time series refinery api object and then do an update of the series. This is one of the simplest thing we can do to acquire data. .. note:: This however is a bit ad-hoc and lacks separation of concerns: * acquisition and injection happens in the same place * the horizon/time window handling is hard-coded * there is no handling of metadata, and we won't know easily the provenance of the acquired data * we hard-coded the zone and time window In the chapter on advanced topics we will propose a more structured way to organize this and address the issues listed above. Using the task manager ---------------------- To have this scraping function running, we need a scheduler. We propose a small tour of the integrated task manager. Here is how it looks: .. image:: ./tasks.png The first tab shows the list of tasks that are either: * queued * running * done (or failed or aborted) Each task shown there is defined by a Python function and an input. The tasks do not run at arbitrary times: the scheduler tab shows how tasks will be scheduled. .. image:: ./schedulers.png The most important column in this view is `scheduler`: it exposes a scheduling rule whose syntax obeys the laws of the `cron `_ tool. .. note:: The cron-like notation used for the scheduler actually supports rules down to the seconds, hence the 6 fields. The rules for seconds are the same as for minutes. The `plan` tab shows the tasks that will be scheduled in the next 1 hour. .. image:: ./plan.png This is all what we need to finely control our scraping activities. Now let's look at how to land it there ! Writing and using scrapers for primary series --------------------------------------------- We need to define a task: .. code:: python from rework.api import task @task def scrap_res_generation_forecast_entsoe(task): get_res_generation_forecast_entsoe() This is the strict minimum. For comfort, we can do a bit more, as in this version: .. code:: python from rework.api import task @task def scrap_res_generation_forecast_entsoe(task): with task.capturelogs(std=True): get_res_generation_forecast_entsoe() print('Success !') The `capturelogs` context manager will make sure the task system will collect, when the task runs, all the logs. The `std` parameter allows to also capture everything coming out of the `print` function. Very handy to trace and follow what is going on. .. note:: The `@task` decorator supports various options not shown there. You will find a complete description of its capabilities in the chapter on the task manager. Notable options are `domain` and `timeout`. Having written this, we must register the task. We'll do like this (assuming you have put the code above in a Python module named `mytasks.py`): .. code:: bash $ rework register-operations refinery mytasks.py Now, this is ready to run. You can go to the `Launchers` tab and start your task immediately, or go the `Schedulers` tab and organize its scheduling rule (there is an action for that at the bottom of the page). .. image:: ./newsched.png When the task is appearing in the tasks list, you can click on its name to get to see the logs. It may look like that: .. image:: ./tasklog.png Building formulas on top to get curated series ---------------------------------------------- With the basics of data injection in place, we can now turn to the construction of the time series we actually want. It is certainly possible to write *computing functions* and put them into tasks like we did for data acquisition, but we propose a nicer way to do it. Indeed, arbitrary Python code is very powerful but it has downsides: * it is unsafe (you just can't accept random python code from your analysts on the server side of operations), so you need a lengthy and costly validation workflow to have it deployed * it is (by default at least) not type-safe * it takes quite some time to learn to do it properly * it is not easily reusable or composable nor shareable (still possible but it takes a lot of learning to do it properly) * seeing the computation may be difficult and auditing it can become very complicated quickly We have a solution to these issues: * use a very simple safe domain specific expression language to do the computations * call this a "formula language" (like for Excel), suitable for non-programmers * do the computations on the fly (by default: doing materialized computations remains possible, especially for performance reasons) * provide a vast array of composable primitive operators (all of pandas can be made available if necessary, plus more) * define computed time series from this base, that will a) belong to the time series referential and b) look like a stored time series for all intents and purposes (for the read operations at least) By limiting the computation power, we actually gain more powers ! In the basics chapter we learnt how to create and use formulas. Let see how to apply this knowledge to build our time series information system. A first step would be *normalization*. Its purpose is to adress the following (non-exhaustive) list of questions : * do the raw series have the needed granularities ? * are these granularities actually constant ? If we perform our analysis on e.g. hourly time series, can we make sure our computation will start from hourly series (without throwing away data) ? * are the raw data using the units we want to work with (e.g. MWh, euros) ? * are the raw series complete or do we need to combine some together to get a coherent picture ? * how do we manage outliers ? So we can write formulas that provides a first layer of transformation. Assuming we scrap the series `power.price.spot.at.da.eurmwh.entsoe.obs.raw` from ENTSO-E; that series flipped from hourly to 15-minutely granularity in october 22, and later reverted back to hourly. We want to prevent this discontinuity. So let's write a simple formula: .. code:: lisp (resample "power.price.spot.at.da.eurmwh.entsoe.obs.raw" "h") Using the `register_formula` api call, we create `power.price.spot.at.da.eurmwh.entsoe.obs.h`: .. code:: python tsa.register_formula( 'power.price.spot.at.da.eurmwh.entsoe.obs.h', '(resample "power.price.spot.at.da.eurmwh.entsoe.obs.raw" "h")' ) Meteo temperature time series from `ERA5 `_ are provided in Kelvin. A conversion to Celsius could be useful: .. code:: lisp (+ -273.15 (series "meteo.area.ch.2t.k.era5.obs.h")) Completeness: it is not uncommon to have raw series describing the same fundamental data, while being different in a subtle way. For instance, one source has a long history but its update frequency on recent data is spotty; and then you find another source with less history depth but more stringent update baheviour. Let's combine them into a well-behaved series: .. code:: lisp (priority (series "power.price.spot.dk1.da.eurmwh.entsoe.obs.h") (series "power.price.spot.dk1.da.eurmwh.epex.obs.h")) The `priority` operator is immensely helpful there. It wil take the points of the first series if there are any - barring that, the points of the second series are selected. Another situation for `priority`, with 3 series this time: some flux have observed data, nominated data, and forecasted data. We can combined these three into a single series: .. code:: lisp (priority (series "gas.flow.ne.dunkirk.gassco.msmd.obs") (series "gas.flow.ne.dunkirk.gassco.msmd.nom") (series "gas.flow.ne.dunkirk.gassco.msmd.fcst") Outliers: this is a complex topic, which is highly dependent on the nature of the data. There exists simple situations where bluntly removing obviously bogus values works. To do that, we can use the `clip` operator. .. code:: lisp (clip (series "this-series-cannot-be-negative") #:min 0) Feeding a model with series, getting the output in the system, with the task manager ------------------------------------------------------------------------------------ Overview ........ In this section, we demonstrate how to implement a model in the timeseries refinery. As an example, we use a simple linear regression, to emphasize the interaction with the refinery. A modelisation process will alternate between two main phases: calibration and forecast. The calibration process needs input and output data and produces calibration parameters. The forecast process needs input data and calibration parameters and produces output estimation. The calibration parameters are stored in base, which allows to decouple the two operations. Moreover, the evolution of these parameters is valuable information, hence we want to store them without deleting the previous ones. For this purpose, we use the dbcache package shipped with `tshistory_refinery`. This package proposes 3 types of API: - a cache with an associated lifetime - a key-value store - a versioned key-value store In this case we are using the later (called `vkvstore`). Initialisation .............. We initialize in the postgres database a new namespace dedicated to our models parameters. In order to to that one has to type the following command: .. code:: bash $ dbcache init-db --namespace calibration Here are now the necessary python functions. Necessary imports: .. code:: python import json import pandas as pd from sklearn.linear_model import LinearRegression from sklearn.metrics import r2_score from tshistory.api import timeseries from dbcache.api import vkvstore Calibration per se: .. code:: python def calibrate(tsa): # input x = tsa.get('meteo.area.fr.2t.k.era5.obs.h') # output y = tsa.get('power.load.fr.mwh.entsoe.obs.h') x.name = 'x' y.name = 'y' common = pd.concat([x, y], axis=1, join='inner') predictor = LinearRegression(n_jobs=-1) predictor.fit(X=common[['x']], y=common[['y']]) slope = predictor.coef_[0][0] intercept = predictor.intercept_[0] # we want to store some indicator of fitness: r2 = r2_score( common[['y']], predictor.predict(common[['x']]) ) # we prepare the serialization of the calibration parameters # the pickle method should be avoided for archive purpose results = { 'slope': slope, 'intercept': intercept, 'r2': r2, } return results Save the calibration parameters: .. code:: python def save_coefficients(tsa, results): store = vkvstore(tsa.uri, 'calibration') # vkvstore use string as keys and bstring as values json.dumps(results) store.set( 'linear-calibration-key', json.dumps(results).encode('utf-8') ) .. note:: Note the “calibration” string in the vkvstore api which references the string used as namespace in the dbcache init-db command. Now for the forecasting, we need to load the saved parameters: .. code:: python def load_coefficients(tsa): store = vkvstore(tsa.uri, 'calibration') stored = store.get('linear-calibration-key') return json.loads(stored.decode('utf-8')) And finally read the input data, apply the model on it and store the results as a timeseries: .. code:: python def forecast(tsa, coefficients): x = tsa.get('meteo.area.fr.2t.k.era5.obs.h') predicted = x * coefficients['slope'] + coefficients['intercept'] name = 'power.load.fr.mwh.linear.fcst.h' tsa.update( name, predicted, 'linear-model' ) We can now assemble these operations in two distincts tasks: .. code:: python @task(inputs=(), domain='models') def linear_calibration(task): with task.capturelogs(std=True): tsa = timeseries() results = calibrate(tsa) save_coefficients(tsa, results) .. code:: python @task(inputs=(), domain='models') def linear_forecast(task): with task.capturelogs(std=True): tsa = timeseries() coefficients = load_coefficients(tsa) forecast(tsa, coefficients) This tasks can then be scheduled, accordingly to your calibration/forecasting rules.