# -*- python -*-
#
# Copyright 2016-2025 Inria - CIRAD - INRAe
#
# Distributed under the Cecill-C License.
# See accompanying file LICENSE.txt or copy at
# http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
# WebSite : https://github.com/openalea/astk
#
# File author(s): Christian Fournier <christian.fournier@inrae.fr>
#
# ==============================================================================
"""
Provides utilities for scheduling models in simulation
"""
from __future__ import division
from functools import reduce
import numpy
[docs]
class TimeControlSet:
def __init__(self, **kwd):
""" Create a TimeControlSet , that is a simple class container for named object"""
self.__dict__.update(kwd)
[docs]
def check(self,attname,defaultvalue):
""" Check if an attribute exists. If not create it with default value """
if not hasattr(self,attname):
setattr(self,attname,defaultvalue)
[docs]
def simple_delay_timing(delay = 1, steps =1):
return (TimeControlSet(dt=delay) if not i % delay else TimeControlSet(dt=0) for i in range(steps))
[docs]
class TimeControl:
def __init__(self, delay=None, steps=None, model=None, weather=None, start_date=None):
""" create a generator-like timecontrol object """
self.delay = delay
self.steps = steps
self.model = model
self.weather = weather
self.start_date = start_date
try:
self._timing = model.timing(delay=delay, steps=steps, weather=weather, start_date=start_date)
except:
if model is not None:
print('Warning : not able to call model.timing correctly !!!')
try:
self._timing = simple_delay_timing(delay=delay, steps=steps)# a generator of timecontrolset objects to be used during a simulation
except:
self._timing = simple_delay_timing()
def __iter__(self):
return TimeControl(delay=self.delay, steps=self.steps, model=self.model, weather=self.weather, start_date=self.start_date)
def __next__(self):
return next(self._timing)
[docs]
class TimeControler:
def __init__(self, **kwd):
""" create a controler for parallel run of time controls
Allows to emulate 'discrete event'-like evaluation of timecontrol objects in a script
"""
self._timedict = dict(kwd)
self.numiter = 0
def __iter__(self):
self._timedict = dict((k,iter(v)) for k,v in self._timedict.items())
self.numiter = 0
return self
def __next__(self):
d = dict((k,next(v)) for k,v in self._timedict.items())
if len(d) == 0:
raise StopIteration
self.numiter += 1
return d
# new approach
[docs]
def evaluation_sequence(delays):
""" retrieve evaluation filter from sequence of delays
"""
seq = [[True if i == 0 else False for i in range(int(d))] for d in delays]
return reduce(lambda x,y: x + y, seq)
[docs]
class EvalValue:
def __init__(self, eval, value, dt):
self.eval = eval
self.value = value
self.dt = dt
def __nonzero__(self):
return self.eval
[docs]
class IterWithDelays:
def __init__(self, values = [None], delays = [1]):
self.delays = delays
self.values = values
self._evalseq = iter(evaluation_sequence(delays))
self._iterable = iter(values)
self._iterdelays = iter(delays)
def __iter__(self):
return IterWithDelays(self.values, self.delays)
def __next__(self):
self.ev = next(self._evalseq)
if self.ev :
try: #prevent value exhaustion to stop iterating
self.val = next(self._iterable)
self.dt = next(self._iterdelays)
except StopIteration:
pass
return EvalValue(self.ev, self.val, self.dt)
def _truncdata(data, before, after, last):
d = data.truncate(before = before, after = after)
if after.to_datetime64() < last.to_datetime64():
d = d.iloc[:-1,]
return d
[docs]
def time_control(time_sequence, eval_filter, data=None):
""" Produces controls for multi-delay or weather dependant models
return splited weather data (if given) and delays
:Parameters:
----------
- `time_sequence` (panda dateTime index)
A sequence of TimeStamps indicating the dates of all elementary time steps of the simulation
- `eval_filter` a list (same length as time_sequence) of bools indicating the steps at which an evaluation is needed
- `data` (panda dataframe indexed by date)
data for the model
"""
starts = time_sequence[eval_filter]
ends = starts[1:].tolist() + [time_sequence[-1]]
last = time_sequence[-1]
if data is not None:
controls = [((end - start).total_seconds() // 3600, _truncdata(data, start, end, last)) for start,end in zip(starts,ends)]
else:
controls = [((end - start).total_seconds() // 3600, None) for start,end in zip(starts,ends)]
delays, values = list(zip(*controls))
return values, delays
[docs]
def time_filter(time_sequence, delay = 1):
""" return an evaluation filter being True at regular period
:Parameters:
----------
- `time_sequence` (panda dateTime index)
A sequence of TimeStamps indicating the dates of all elementary time steps of the simulation
- `delay` (int)
The duration of each period
"""
time = [(t - time_sequence[0]).total_seconds() / 3600 for t in time_sequence]
filter = [t % delay == 0 for t in time]
return filter
[docs]
def time_filter_node(time_sequence, delay = 1):
filter = time_filter(time_sequence, delay)
return time_sequence, filter
#time_filter_node.__doc__ = time_filter.__doc__
[docs]
def date_filter(time_sequence, time_data):
"""
Return evaluation filter being True at date in time_data
- time_data : a datetimle indexed panda dataframe
"""
filter = [True if d.to_datetime64() in time_data.index.to_datetime64() else False for d in time_sequence]
return filter
[docs]
def date_filter_node(time_sequence, time_data):
filter = date_filter(time_sequence, time_data)
return time_sequence, filter, time_data
[docs]
def rain_filter(time_sequence, weather, rain_min = 0.2):
""" return an evaluation filter iterating every rain event and every between-rain event
:Parameters:
----------
- `time_sequence` (panda dateTime index)
A sequence of TimeStamps indicating the dates of all elementary time steps of the simulation
- `weather` (weather instance)
weather database (should contain rain column)
"""
try:
rain = weather.data.rain[time_sequence]
except:
#strange extract needed on visualea 1.0 (to test again with ipython in visualea)
rain_data = weather.data[['rain']]
rain = numpy.array([float(rain_data.loc[d]) for d in time_sequence])
#rain = weather_data.rain[time_sequence]
rain[rain <= rain_min] = 0
rain[rain > 0] = 1
filter = [True] +(rain[1:] != rain[:-1]).tolist()
return filter
[docs]
def rain_filter_node(time_sequence, weather):
filter = rain_filter(time_sequence, weather)
return time_sequence, filter, weather.data
[docs]
class DegreeDayModel:
""" Classical degreeday model equation
"""
#import numpy as np
def __init__(self, Tbase = 0):
self.Tbase = Tbase
def __call__(self, time_sequence, weather_data):
""" Compute thermal time accumulation over time_sequence
:Parameters:
----------
- `time_sequence` (panda dateTime index)
A sequence of TimeStamps indicating the dates of all elementary time steps of the simulation
- weather (openalea.astk.Weather instance)
A Weather database
"""
try:
Tair = weather_data.temperature_air[time_sequence]
except:
#strange extract needed on visualea 1.0 (to test again with ipython in visualea)
T_data = weather_data[['temperature_air']]
Tair = numpy.array([float(T_data.loc[d]) for d in time_sequence])
Tcut = numpy.maximum(numpy.zeros_like(Tair), Tair - self.Tbase)
days = [0] + [((t - time_sequence[0]).total_seconds()+ 3600) / 3600 / 24 for t in time_sequence]
dt = numpy.diff(days).tolist()
return numpy.cumsum(Tcut * dt)
# functional call for nodes
[docs]
def degree_day_model(Tbase = 0):
return DegreeDayModel(Tbase)
[docs]
def thermal_time(time_sequence, weather_data, model = DegreeDayModel(Tbase = 0)):
return model(time_sequence, weather_data)
[docs]
def thermal_time_filter(time_sequence, weather, model = DegreeDayModel(Tbase = 0), delay = 10):
""" return an evaluation filter being True at regular thermal time period
:Parameters:
----------
- `time_sequence` (panda dateTime index)
A sequence of TimeStamps indicating the dates of all elementary time steps of the simulation
- weather (openalea.astk.Weather instance)
A Weather database
- `model` a model returning Thermal Time accumulation as a function of time_sequence and weather
- `delay` (int)
The duration of each period
"""
TT = thermal_time(time_sequence, weather.data, model)
intTT = (TT/delay).astype(int)
filter = [True] +(intTT[1:] != intTT[:-1]).tolist()
return filter
[docs]
def thermal_time_filter_node(time_sequence, weather, model, delay):
filter = thermal_time_filter(time_sequence, weather, model, delay)
return time_sequence, filter, weather.data, model
[docs]
def filter_or(filters):
return reduce(lambda x,y: numpy.array(x) | numpy.array(y), filters)
[docs]
def filter_and(filters):
return reduce(lambda x,y: numpy.array(x) & numpy.array(y), filters)
try:
from openalea.core.system.systemnodes import IterNode
except ImportError:
IterNode = object
pass
[docs]
class IterWithDelaysNode(IterNode):
""" Iteration Node """
[docs]
def eval(self):
"""
Return True if the node need a reevaluation
"""
try:
if self.iterable == "Empty":
self.iterable = iter(self.inputs[0])
self.iterdelay = iter(self.inputs[1])
self.wait = self.inputs[1][-1]
if(hasattr(self, "nextval")):
self.outputs[0] = self.nextval
else:
self.outputs[0] = next(self.iterable)
self.nextval = next(self.iterable)
delay = next(self.iterdelay)
self.outputs[1] = delay
self.outputs[2] = numpy.random.random() #used to trigger lazy nodes every delay
return delay
except TypeError as e:
self.outputs[0] = self.inputs[0]
self.outputs[1] = self.inputs[1]
return False
except StopIteration as e:
if self.wait > 1:
self.wait -= 1
return True
else:
self.iterable = "Empty"
if(hasattr(self, "nextval")):
del self.nextval
return False
#from datetime import datetime, timedelta
#import pytz
##import numpy as np
# class TimeSequence(object):
# """ Create / manipulate 'actual time' sequences for simulations
# """
# def __init__(self, start_date ='2000-10-01 01:00:00', time_step = 1, steps = 24):
# """ Create a datetime sequence from start_date to start_date + steps days, every time step hours
# datetime object are created as UTC
# """
# start = pytz.utc.localize(datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S"))
# self.steps = steps
# self.time_steps = [time_step for i in range(steps)]
# self.time = [start + i * timedelta(hours=time_step) for i in range(steps)]
# def as_localtime(self, local_tz = pytz.timezone('Europe/Paris'), format = "%Y-%m-%d %H:%M:%S"):
# return [utc_dt.astimezone(local_tz) for utc_dt in self.time]
# def formated(self, time = None, format = "%Y-%m-%d %H:%M:%S"):
# if time is None:
# return [t.strftime(format) for t in self.time]
# else:
# return [t.strftime(format) for t in time]
[docs]
def new_canopy(plant_model, age = 0):
g = plant_model.setup_canopy(age)
return g, plant_model
[docs]
def grow_canopy(g,plant_model,time_control):
g = plant_model.grow(g,time_control)
return g, plant_model
[docs]
def plot_canopy(g,plant_model):
s = plant_model.plot(g)
return s, plant_model