Source code for xer_parser.model.tasks

from xer_parser.model.classes.task import Task
from xer_parser.model.classes.taskactv import TaskActv
from xer_parser.model.classes.taskpred import TaskPred

__all__ = ["Tasks"]


[docs] class Tasks: """ This class is a collection of tasks that controls functionalities to search, add, update and delete tasks """
[docs] def __init__(self) -> None: self.index = 0 self._tasks = []
[docs] def add(self, params, data) -> None: task = Task(params, data) self._tasks.append(task)
@property def activities(self) -> list[Task]: return self._tasks @property def count(self) -> int: return len(self._tasks) @property def has_no_successor(self) -> list: return list( filter( lambda x: x.task_id not in [z.pred_task_id for z in TaskPred.obj_list], self._tasks, ) ) @property def has_no_predecessor(self) -> list: return list( filter( lambda x: x.task_id not in [z.task_id for z in TaskPred.obj_list], self._tasks, ) ) def __len__(self) -> int: return len(self._tasks) def __repr__(self) -> str: return str([x.task_code for x in self._tasks]) def __str__(self) -> str: return str([str(x.task_code) for x in self._tasks]) @property def constraints(self) -> list: lst = [ x.constraints if x.constraints is not None else None for x in self._tasks ] # print(lst) return list(filter(lambda x: x is not None, lst))
[docs] def find_by_id(self, id): # TODO: Add correct return type annotation obj = list(filter(lambda x: x.task_id == id, self._tasks)) if len(obj) > 0: return obj[0] return obj
[docs] def find_by_code(self, code): # TODO: Add correct return type annotation obj = list(filter(lambda x: x.task_code == code, self._tasks)) if len(obj) > 0: return obj[0] return obj
[docs] def duration_greater_than( self, duration ): # TODO: Add correct return type annotation obj = list( filter( lambda x: x.target_drtn_hr_cnt > duration * float(self.calendar.day_hr_cnt), self._tasks, ) ) if obj: return obj return obj
[docs] def float_less_than(self, Tfloat): # TODO: Add correct return type annotation objs = list(filter(lambda x: x.status_code != "TK_Complete", self._tasks)) obj = list( filter( lambda x: x.total_float_hr_cnt < Tfloat * float(x.calendar.day_hr_cnt), objs, ) ) if obj: return obj return obj
[docs] def float_greater_than(self, Tfloat): # TODO: Add correct return type annotation objs = list(filter(lambda x: x.status_code != "TK_Complete", self._tasks)) obj = list( filter( lambda x: x.total_float_hr_cnt > Tfloat * float(x.calendar.day_hr_cnt), objs, ) ) if obj: return obj return obj
[docs] def float_within_range(self, float1, float2): obj = None objs = list(filter(lambda x: x.status_code != "TK_Complete", self._tasks)) if float1 < float2: obj = list( filter( lambda x: x.total_float_hr_cnt >= float1 * float(x.calendar.day_hr_cnt) and x.total_float_hr_cnt <= float2 * float(x.calendar.day_hr_cnt), objs, ) ) if obj: return obj return obj
[docs] def float_within_range_exclusive(self, float1, float2): obj = None objs = list(filter(lambda x: x.status_code != "TK_Complete", self._tasks)) if float1 < float2: obj = list( filter( lambda x: x.total_float_hr_cnt > float1 * float(x.calendar.day_hr_cnt) and x.total_float_hr_cnt < float2 * float(x.calendar.day_hr_cnt), objs, ) ) if obj: return obj return obj
[docs] def activities_by_status(self, status): return list(filter(lambda x: x.status_code == status, self._tasks))
[docs] def activities_by_wbs_id(self, id): return list(filter(lambda x: x.wbs_id == id, self._tasks))
[docs] def activities_by_activity_code_id(self, id): objs = list(filter(lambda x: x.actv_code_id == id, TaskActv.obj_list)) activities = [] for obj in objs: activities.append(self.find_by_id(obj.task_id)) return activities
[docs] def no_predecessors(self): return list( filter( lambda x: x.task_id not in [z.task_id for z in TaskPred.obj_list], self._tasks, ) )
[docs] def no_successors(self): return list( filter( lambda x: x.task_id not in [z.pred_task_id for z in TaskPred.obj_list], self._tasks, ) )
[docs] def activities_with_hard_contratints(self): return list( filter( lambda x: x.cstr_type == "CS_MEO" or x.cstr_type == "CS_MSO", self._tasks, ) )
[docs] def activities_by_type(self, type): return list(filter(lambda x: x.cstr_type == type, self._tasks))
[docs] def get_tsv(self): tsv = [] if len(self._tasks) > 0: tsv.append(["%T", "TASK"]) tsv.append( [ "%F", "task_id", "proj_id", "wbs_id", "clndr_id", "phys_complete_pct", "rev_fdbk_flag", "est_wt", "lock_plan_flag", "auto_compute_act_flag", "complete_pct_type", "task_type", "duration_type", "status_code", "task_code", "task_name", "rsrc_id", "total_float_hr_cnt", "free_float_hr_cnt", "remain_drtn_hr_cnt", "act_work_qty", "remain_work_qty", "target_work_qty", "target_drtn_hr_cnt", "target_equip_qty", "act_equip_qty", "remain_equip_qty", "cstr_date", "act_start_date", "act_end_date", "late_start_date", "late_end_date", "expect_end_date", "early_start_date", "early_end_date", "restart_date", "reend_date", "target_start_date", "target_end_date", "rem_late_start_date", "rem_late_end_date", "cstr_type", "priority_type", "suspend_date", "resume_date", "float_path", "float_path_order", "guid", "tmpl_guid", "cstr_date2", "cstr_type2", "driving_path_flag", "act_this_per_work_qty", "act_this_per_equip_qty", "external_early_start_date", "external_late_end_date", "create_date", "update_date", "create_user", "update_user", "location_id", ] ) for task in self._tasks: tsv.append(task.get_tsv()) return tsv
[docs] def get_by_project(self, id): return list(filter(lambda x: x.proj_id == id, self._tasks))
def __iter__(self) -> "Tasks": return self def __next__(self) -> Task: if self.index >= len(self._tasks): raise StopIteration idx = self.index self.index += 1 return self._tasks[idx]