Source code for pypeline.core.pipeline

# -*- coding: utf-8 -*-

# below imports enables python 2 and 3 compatible codes
# requires python-future, install by `pip install future`
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
from builtins import *

import asyncio
import collections

from .funcwrap import connect, pipenize



[docs]class Pipeline(object): """ pipeline to connect settings for streamline data processing. Example: p = PipeLine([list of functions]) p.process(data_in) p.close() to close the pipeline """ def __init__(self, func_list=[]): """ Args: func_list (list): a list of function to run through pipeline """ if isinstance(func_list, list) and func_list: self.func_list = pipenize(func_list) self.func_names = [func.__name__ for func in self.func_list] self.pipeline = connect(self.func_list) else: self.pipeline = None self.run_event = asyncio.get_event_loop()
[docs] def insert(self, position, func): """ Args: position (int): the position to inser the function into the pipeline func (function/list of function): function to be inserted in to the pipeline """ if isinstance(func, list): for f in func: self.func_list.insert(position, pipenize(f)) self.func_names.insert(position, f.__name__) elif callable(func): self.func_list.insert(position, pipenize(func)) self.func_names.insert(position, func.__name__) self.pipeline = connect(self.func_list)
[docs] def pop_by_name(self, func_name): """ Args: func_name (str): name of function to be popped out of the pipeline """ if callable(func_name): func_name = func_name.__name__ idx = self.func_names.index(func_name) self.func_list.pop(idx) self.func_names.pop(idx) self.pipeline = connect(self.func_list)
[docs] def pop_by_idx(self, idx): """ Args: func_idx (int): index of function to be popped out of the pipeline this is useful when there are functions with same name but idx will be different """ self.func_list.pop(idx) self.func_names.pop(idx) self.pipeline = connect(self.func_list)
[docs] def swap_by_name(self, func1_name, func2_name): """ Args: func1_name: the first name of the function to be swapped func2_name: the second name of the function to be swapped """ # TODO: swap two functions in the pipeline by name pass
[docs] def swap_by_idx(self, idx1, idx2): """ Args: idx1: the first index of the function to be swapped idx2: the second index of the function to be swapped """ # TODO: swap two functions in the pipeline by index pass
[docs] def process(self, data_in): """ process data or iterable data_source, then yield the result Args: data_in: input data of the pipeline, this can be data or iterable data source """ print(data_in) if isinstance(data_in, collections.Iterable): for d in data_in: yield self.run_event.run_until_complete(self.pipeline(d)) else: yield self.run_event.run_until_complete(self.pipeline(data_in)) self.close()
[docs] def close(self): """ close pipeline, stop yielding results, close the async threads """ self.run_event.close()