Pyper offers a flexible framework for concurrent and parallel data processing in Python, utilizing functional programming patterns. Ideal for building ETL systems, data microservices, and collection tools, it simplifies the complexity of threading, multiprocessing, and asynchronous tasks while ensuring safety and efficiency.
Pyper is a versatile framework designed to simplify concurrent and parallel data processing in Python through functional programming patterns. Primarily utilized for constructing ETL Systems, Data Microservices, and for Data Collection, Pyper provides developers with an efficient and intuitive API.
Key Features
- Intuitive API: Designed for ease of understanding and use, it offers clean abstractions that unify threaded, multiprocessed, and asynchronous workflows.
- Functional Paradigm: Utilizing Python functions as the building blocks, it encourages the creation of clean, reusable code effortlessly.
- Safety: Tasks are executed while managing underlying complexities such as resource cleanup and error handling, thereby eliminating concerns like race conditions and memory leaks.
- Efficiency: Built for lazy execution, it employs queues, workers, and generators to optimize performance.
- Pure Python: A lightweight solution with no additional dependencies, ensuring a seamless integration into existing projects.
Usage Example
Pyper makes it convenient to create data processing pipelines using the task
decorator. Here is a streamlined example demonstrating how to establish a pipeline with asynchronous tasks:
import asyncio
import time
from pyper import task
def get_data(limit: int):
for i in range(limit):
yield i
async def step1(data: int):
await asyncio.sleep(1)
print("Finished async wait", data)
return data
def step2(data: int):
time.sleep(1)
print("Finished sync wait", data)
return data
def step3(data: int):
for i in range(10_000_000):
_ = i*i
print("Finished heavy computation", data)
return data
async def main():
pipeline = task(get_data, branch=True) \
| task(step1, workers=20) \
| task(step2, workers=20) \
| task(step3, workers=20, multiprocess=True)
total = 0
async for output in pipeline(limit=20):
total += output
print("Total:", total)
if __name__ == "__main__":
asyncio.run(main())
In this example, a series of asynchronous and synchronous tasks are chained together to process data efficiently using a pipeline approach, allowing for an organized flow of data transformation.
For more detailed documentation on setting up and using Pyper, refer to the Documentation.
Explore more features and examples of Pyper to maximize your data processing capabilities here.
No comments yet.
Sign in to be the first to comment.