Getting Started with Ceylon Playground
Ceylon is a distributed task processing framework that enables building scalable agent-based systems. This guide will walk you through creating your first Ceylon application using the Playground system.
Core Concepts
- Playground: A central coordinator that manages workers and task distribution
- ProcessWorker: Handles specific types of tasks
- Task: A unit of work with optional dependencies
- ProcessRequest/Response: Communication protocol between playground and workers
Basic Example: Text Processing System
Let's create a simple system that processes text. We'll build: 1. A text processor that multiplies input 2. An aggregator that combines results 3. A playground to coordinate them
import asyncio
from ceylon.processor.agent import ProcessWorker, ProcessRequest
from ceylon.task.manager import Task
from ceylon.task.playground import TaskProcessingPlayground
class TextProcessor(ProcessWorker):
"""Processes text-based tasks."""
async def _processor(self, request: ProcessRequest, time: int):
data = request.data
return data * 5
class AggregateProcessor(ProcessWorker):
"""Aggregates results from multiple tasks."""
async def _processor(self, request: ProcessRequest, time: int):
data = request.data or 0
for d in request.dependency_data.values():
data += d.output
return data
async def main():
# Initialize playground and workers
playground = TaskProcessingPlayground()
worker = TextProcessor("text_worker", role="multiply")
aggregate_worker = AggregateProcessor("aggregate_worker", role="aggregate")
async with playground.play(workers=[worker, aggregate_worker]) as active_playground:
# Create tasks
task1 = Task(
name="Process Data 1",
processor="multiply",
input_data={'data': 5}
)
task2 = Task(
name="Process Data 2",
processor="multiply",
input_data={'data': 10}
)
task3 = Task(
name="Aggregate Results",
processor="aggregate",
dependencies={task1.id, task2.id}
)
# Execute tasks
for task in [task1, task2, task3]:
result = await active_playground.add_and_execute_task(
task=task,
wait_for_completion=True
)
print(f"Task: {task.name}, Result: {result.output}")
if __name__ == "__main__":
asyncio.run(main())
Key Components Explained
1. ProcessWorker
- Base class for task processors
- Implements
_processor
method to handle specific task types - Can access request data and metadata
class CustomWorker(ProcessWorker):
async def _processor(self, request: ProcessRequest, time: int) -> tuple[Any, dict]:
result = process_data(request.data)
metadata = {"processed_at": time}
return result, metadata
2. TaskProcessingPlayground
- Manages worker connections
- Coordinates task execution
- Handles dependencies between tasks
playground = TaskProcessingPlayground(name="my_playground", port=8888)
async with playground.play(workers=[worker1, worker2]) as active_playground:
# Execute tasks here
3. Task
- Represents a unit of work
- Can specify dependencies on other tasks
- Contains input data and processor type
task = Task(
name="MyTask",
processor="worker_role", # Must match worker's role
input_data={'key': 'value'},
dependencies={other_task.id} # Optional dependencies
)
Advanced Features
1. Task Dependencies
Ceylon supports complex task dependencies:
task_a = Task(name="A", processor="role_1", input_data={'data': 1})
task_b = Task(name="B", processor="role_2", input_data={'data': 2})
task_c = Task(
name="C",
processor="role_3",
dependencies={task_a.id, task_b.id}
)
2. Error Handling
Workers can handle errors gracefully:
async def _processor(self, request: ProcessRequest, time: int):
try:
result = process_data(request.data)
return result, {"status": "success"}
except Exception as e:
raise Exception(f"Processing failed: {str(e)}")
3. Custom Metadata
Add metadata to track processing details:
async def _processor(self, request: ProcessRequest, time: int):
result = process_data(request.data)
metadata = {
"processing_time": time,
"data_size": len(request.data),
"processor_version": "1.0"
}
return result, metadata
Best Practices
-
Worker Design
- Keep workers focused on single responsibilities
- Use meaningful role names
- Include proper error handling
-
Task Management
- Break complex operations into smaller tasks
- Use clear naming conventions
- Carefully manage dependencies
-
Resource Handling
- Use async context managers for cleanup
- Implement proper shutdown procedures
- Monitor worker health
Common Patterns
1. Pipeline Processing
async def create_pipeline():
task1 = Task(name="Extract", processor="extractor")
task2 = Task(name="Transform", processor="transformer",
dependencies={task1.id})
task3 = Task(name="Load", processor="loader",
dependencies={task2.id})
return [task1, task2, task3]
2. Parallel Processing
tasks = [
Task(name=f"Process_{i}", processor="worker")
for i in range(5)
]
results = await playground.execute_task_group(tasks)
Debugging Tips
-
Enable detailed logging:
from loguru import logger logger.add("debug.log", level="DEBUG")
-
Monitor task states:
task_status = playground.task_manager.get_task(task_id).status print(f"Task {task_id} status: {task_status}")
-
Check worker connections:
connected_workers = playground.llm_agents print(f"Connected workers: {connected_workers}")
Next Steps
- Explore more complex task dependencies
- Implement custom error handling strategies
- Add monitoring and metrics collection
- Scale your system with additional workers
- Implement custom task scheduling logic
For more information, visit the Ceylon documentation at https://ceylon.ai