The ultimate resource for building intelligent agents and mastering modern AI/ML 🤖
From fundamentals to production-ready agentic systems. No fluff, just actionable content.
|
|
|
# Start with fundamentals
1. Complete Andrew Ng's ML Course → https://www.coursera.org/learn/machine-learning
2. Learn Python for AI → https://github.com/fastai/fastbook
3. Build your first agent → See "Project 2: Multi-Agent Research Team" below
4. Join communities → Discord links in Resources section# Dive into modern AI
1. Master Transformers → https://huggingface.co/course/
2. Learn LangChain → Complete tutorial series below
3. Build multi-agent systems → CrewAI examples included
4. Read key papers → Research section has 50+ papers# Production and research
1. MLOps with agents → Deployment guides included
2. Custom agent architectures → Architecture patterns below
3. Research contributions → Template for writing papers
4. Open source projects → 20+ project ideas included| Course | Provider | Level | Duration | Focus |
|---|---|---|---|---|
| Machine Learning Specialization | Stanford/Coursera | Beginner | 3 months | ML Fundamentals |
| Deep Learning Specialization | DeepLearning.AI | Intermediate | 4 months | Neural Networks |
| Natural Language Processing | DeepLearning.AI | Intermediate | 3 months | NLP & Transformers |
| CS224N: NLP with Deep Learning | Stanford | Advanced | 1 semester | Research-level NLP |
| MIT 6.034 Artificial Intelligence | MIT OpenCourseWare | Intermediate | 1 semester | AI Fundamentals |
| Practical Deep Learning | fast.ai | Beginner-Int | 2 months | Practical Implementation |
| Reinforcement Learning Course | David Silver/UCL | Advanced | Self-paced | RL & Decision Making |
- Deep Learning by Ian Goodfellow - The bible of deep learning
- Pattern Recognition and Machine Learning by Christopher Bishop - Mathematical foundations
- The Elements of Statistical Learning by Hastie, Tibshirani, Friedman - Statistical ML theory
- Building LLMs for Production by Chip Huyen - Production ML systems
- Generative Deep Learning by David Foster - GANs, VAEs, Transformers
- Natural Language Processing with Python by Steven Bird - Practical NLP
- Artificial Intelligence: A Modern Approach by Russell & Norvig - Agent theory
- Reinforcement Learning: An Introduction by Sutton & Barto - Decision making
- Multi-Agent Systems by Gerhard Weiss - Multi-agent coordination
📖 Fundamentals (Click to expand)
- Perceptrons & Multi-layer Networks: Start with basic building blocks
- Backpropagation: The learning algorithm that powers everything
- Activation Functions: ReLU, Sigmoid, Tanh, and modern variants
- Optimization: SGD, Adam, RMSprop, learning rate scheduling
- Regularization: Dropout, batch normalization, weight decay
# Simple Neural Network from Scratch (Educational)
import numpy as np
class NeuralNetwork:
def add_documents(self, documents):
"""Add documents to the vector database"""
for i, doc in enumerate(documents):
embedding = self.embedding_model.encode([doc['content']])[0]
self.collection.add(
embeddings=[embedding.tolist()],
documents=[doc['content']],
metadatas=[doc.get('metadata', {})],
ids=[f"doc_{i}"]
)
def retrieve(self, query, k=5):
"""Retrieve relevant documents for a query"""
query_embedding = self.embedding_model.encode([query])[0]
results = self.collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=k
)
return results['documents'][0]
def generate_response(self, query, context_docs):
"""Generate response using retrieved context"""
context = "\n".join(context_docs[:3]) # Use top 3 docs
prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
inputs = self.tokenizer.encode(prompt, return_tensors='pt')
with torch.no_grad():
outputs = self.generator.generate(
inputs,
max_length=inputs.shape[1] + 150,
temperature=0.7,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)
return response.strip()
def query(self, question):
"""Complete RAG pipeline"""
# Retrieve relevant documents
relevant_docs = self.retrieve(question)
# Generate response with context
response = self.generate_response(question, relevant_docs)
return {
'answer': response,
'sources': relevant_docs,
'confidence': self.calculate_confidence(question, relevant_docs)
}
def calculate_confidence(self, query, docs):
"""Calculate confidence score for the response"""
# Simple similarity-based confidence
query_embedding = self.embedding_model.encode([query])[0]
doc_embeddings = self.embedding_model.encode(docs)
similarities = [
torch.cosine_similarity(
torch.tensor(query_embedding),
torch.tensor(doc_emb),
dim=0
).item()
for doc_emb in doc_embeddings
]
return max(similarities) if similarities else 0.0
# Usage example
rag_system = AdvancedRAGSystem()
# Add sample documents
documents = [
{"content": "Machine learning is a subset of AI that focuses on algorithms that can learn from data."},
{"content": "Deep learning uses neural networks with multiple layers to model complex patterns."},
{"content": "Transformers are a type of neural network architecture that uses attention mechanisms."}
]
rag_system.add_documents(documents)
# Query the system
result = rag_system.query("What is machine learning?")
print(f"Answer: {result['answer']}")
print(f"Confidence: {result['confidence']:.2f}")- Hybrid Retrieval (Dense + Sparse):
from rank_bm25 import BM25Okapi
class HybridRetriever:
def __init__(self, documents):
self.documents = documents
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
# BM25 for sparse retrieval
tokenized_docs = [doc.split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
# Dense embeddings
self.doc_embeddings = self.embedding_model.encode(documents)
def retrieve(self, query, k=5, alpha=0.5):
"""Combine dense and sparse retrieval"""
# BM25 scores
bm25_scores = self.bm25.get_scores(query.split())
# Dense similarity scores
query_embedding = self.embedding_model.encode([query])[0]
dense_scores = [
torch.cosine_similarity(
torch.tensor(query_embedding),
torch.tensor(doc_emb),
dim=0
).item()
for doc_emb in self.doc_embeddings
]
# Combine scores
combined_scores = [
alpha * dense + (1 - alpha) * sparse
for dense, sparse in zip(dense_scores, bm25_scores)
]
# Get top k documents
top_indices = sorted(range(len(combined_scores)),
key=lambda i: combined_scores[i], reverse=True)[:k]
return [self.documents[i] for i in top_indices]- Multi-Query RAG:
class MultiQueryRAG:
def __init__(self, rag_system):
self.rag_system = rag_system
self.query_generator = AutoModelForCausalLM.from_pretrained("gpt2")
self.query_tokenizer = AutoTokenizer.from_pretrained("gpt2")
def generate_similar_queries(self, original_query, num_queries=3):
"""Generate multiple similar queries for better retrieval"""
prompt = f"Generate {num_queries} similar questions to: {original_query}\n"
inputs = self.query_tokenizer.encode(prompt, return_tensors='pt')
outputs = self.query_generator.generate(inputs, max_length=100)
generated_text = self.query_tokenizer.decode(outputs[0], skip_special_tokens=True)
# Parse and return queries (simplified)
return [original_query] # Would contain parsed queries
def retrieve_multi_query(self, query):
"""Retrieve using multiple query variants"""
queries = self.generate_similar_queries(query)
all_docs = []
for q in queries:
docs = self.rag_system.retrieve(q)
all_docs.extend(docs)
# Remove duplicates and rank
unique_docs = list(set(all_docs))
return unique_docs[:5] # Return top 5
#### **Modern Frameworks:**
- **PyTorch**: `pip install torch torchvision`
- **TensorFlow**: `pip install tensorflow`
- **JAX**: `pip install jax jaxlib` (for research)
</details>
### **Transformers & Large Language Models**
<details>
<summary><b>🔥 Transformer Architecture Deep Dive</b></summary>
#### **Understanding Transformers:**
1. **Attention Mechanism**: "Attention is All You Need" - learn this first
2. **Self-Attention**: How tokens relate to each other
3. **Multi-Head Attention**: Parallel attention mechanisms
4. **Positional Encoding**: How transformers understand sequence order
5. **Feed-Forward Networks**: The other half of transformer blocks
#### **Hands-On Transformer Implementation:**
```python
# Simplified Transformer Block (Educational)
import torch
import torch.nn as nn
import torch.nn.functional as F
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def forward(self, query, key, value, mask=None):
batch_size = query.size(0)
# Linear transformations and reshape
Q = self.W_q(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
K = self.W_k(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
V = self.W_v(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
# Attention
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attention_weights = F.softmax(scores, dim=-1)
context = torch.matmul(attention_weights, V)
# Concatenate heads
context = context.transpose(1, 2).contiguous().view(
batch_size, -1, self.d_model)
return self.W_o(context)
- GPT Family: Text generation, completion
- BERT Family: Understanding, classification
- T5: Text-to-text transfer
- Code Models: CodeT5, CodeBERT for programming
- Hugging Face Transformers - Model hub and library
- Parameter-Efficient Fine-tuning - LoRA, AdaLoRA, etc.
- Unsloth - Fast fine-tuning library
👁️ Vision Systems (Click to expand)
- CNNs: ResNet, EfficientNet, Vision Transformers
- Object Detection: YOLO, R-CNN family, DETR
- Segmentation: U-Net, Mask R-CNN, Segment Anything
- Generative: Diffusion Models, GANs, VAEs
- CLIP: Image-text understanding
- DALL-E/Midjourney: Text-to-image generation
- GPT-4V: Vision-language reasoning
- LLaVA: Open-source vision-language model
# Image Classification with Vision Transformer
from transformers import ViTImageProcessor, ViTForImageClassification
from PIL import Image
# Load pre-trained model
processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224')
model = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224')
# Process image
image = Image.open('your_image.jpg')
inputs = processor(images=image, return_tensors="pt")
outputs = model(**inputs)
predictions = outputs.logits.argmax(-1)🏗️ Agent Architectures & Design Patterns
-
Reactive Agents
- Respond directly to environment stimuli
- No internal state or planning
- Fast but limited reasoning
-
Deliberative Agents
- Maintain internal world model
- Plan before acting
- Better for complex tasks
-
Hybrid Agents
- Combine reactive and deliberative approaches
- Layered architecture (reactive layer + planning layer)
- Most practical for real applications
-
Learning Agents
- Improve performance over time
- Use reinforcement learning or other adaptation methods
- Can discover new strategies
# Agent Architecture Template
class Agent:
def __init__(self):
self.memory = {} # Long-term and short-term memory
self.goals = [] # Current objectives
self.beliefs = {} # Knowledge about the world
self.tools = [] # Available actions/functions
self.reasoner = None # Planning and reasoning engine
def perceive(self, environment):
"""Process input from environment"""
pass
def reason(self):
"""Plan actions based on goals and beliefs"""
pass
def act(self, action):
"""Execute action in environment"""
pass
def learn(self, feedback):
"""Update beliefs and strategies"""
pass🧠 Advanced Reasoning Patterns
# CoT Implementation Example
def chain_of_thought_solve(problem, llm):
prompt = f"""
Let's solve this step by step:
Problem: {problem}
Step 1: Understanding the problem
Step 2: Identifying key information
Step 3: Planning the solution approach
Step 4: Executing the solution
Step 5: Verifying the answer
Let me work through each step:
"""
response = llm.generate(prompt)
return response# Tree of Thoughts for complex reasoning
class TreeOfThoughts:
def __init__(self, llm, max_depth=3):
self.llm = llm
self.max_depth = max_depth
def generate_thoughts(self, problem, current_path=[]):
if len(current_path) >= self.max_depth:
return self.evaluate_solution(current_path)
# Generate multiple thought branches
thoughts = self.llm.generate_thoughts(problem, current_path)
best_thought = None
best_score = -1
for thought in thoughts:
new_path = current_path + [thought]
score = self.evaluate_thought(thought, problem)
if score > best_score:
best_thought = thought
best_score = score
return self.generate_thoughts(problem, current_path + [best_thought])# ReAct Pattern Implementation
class ReActAgent:
def __init__(self, llm, tools):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
def solve(self, task):
thoughts = []
actions = []
observations = []
while not self.is_complete(task, thoughts, actions, observations):
# Think
thought = self.think(task, thoughts, actions, observations)
thoughts.append(thought)
# Act (if thought suggests an action)
if self.should_act(thought):
action = self.extract_action(thought)
if action['tool'] in self.tools:
result = self.tools[action['tool']].run(action['input'])
actions.append(action)
observations.append(result)
return self.synthesize_answer(thoughts, actions, observations)🧠 Agent Memory Architectures
- Short-term Memory: Current conversation context
- Long-term Memory: Persistent knowledge and experiences
- Working Memory: Active information for current task
- Episodic Memory: Specific experiences and events
- Semantic Memory: General knowledge and facts
# Comprehensive Memory System
class AgentMemory:
def __init__(self, vector_db, graph_db):
self.vector_db = vector_db # For semantic similarity
self.graph_db = graph_db # For structured relationships
self.working_memory = {} # Current task context
self.conversation_buffer = [] # Recent interactions
def store_experience(self, experience):
"""Store new experience with embeddings and relationships"""
# Create vector embedding
embedding = self.embed_text(experience['content'])
# Store in vector database
self.vector_db.add(
id=experience['id'],
embedding=embedding,
metadata=experience
)
# Extract and store relationships
entities = self.extract_entities(experience['content'])
self.graph_db.add_relationships(entities)
def retrieve_relevant(self, query, k=5):
"""Retrieve relevant memories for current context"""
# Vector similarity search
similar_memories = self.vector_db.search(query, k=k)
# Graph traversal for related concepts
entities = self.extract_entities(query)
related_memories = self.graph_db.find_related(entities)
# Combine and rank results
return self.rank_memories(similar_memories + related_memories)👥 Collaborative Agent Networks
- Hierarchical: Manager-worker relationships
- Peer-to-Peer: Equal agents collaborating
- Market-based: Agents bid for tasks
- Blackboard: Shared knowledge space
# Multi-Agent Communication System
class MultiAgentSystem:
def __init__(self):
self.agents = {}
self.message_queue = []
self.coordination_protocol = None
def add_agent(self, agent):
self.agents[agent.id] = agent
agent.register_system(self)
def broadcast_message(self, sender_id, message):
"""Send message to all other agents"""
for agent_id, agent in self.agents.items():
if agent_id != sender_id:
agent.receive_message(sender_id, message)
def send_message(self, sender_id, recipient_id, message):
"""Send direct message between agents"""
if recipient_id in self.agents:
self.agents[recipient_id].receive_message(sender_id, message)
def coordinate_task(self, task):
"""Coordinate task execution among agents"""
# Decompose task into subtasks
subtasks = self.decompose_task(task)
# Assign subtasks to appropriate agents
assignments = self.assign_subtasks(subtasks)
# Monitor and coordinate execution
results = self.execute_coordinated(assignments)
return self.synthesize_results(results)🤖 Responsible AI: Ethics, Hallucination Control & HITL
As we build more autonomous and intelligent agentic systems, ensuring ethical behavior, safety, and alignment becomes critically important. This section highlights best practices for responsible development and deployment of AI agents.
- Transparency: Clearly indicate when users are interacting with an AI system. Never present agents as human entities.
- Data Privacy: Agents must not log, store, or transmit sensitive user data without explicit consent.
- Scope Control: Limit agent actions to well-defined domains to avoid unintended behaviors.
- Bias Mitigation: Be aware of biases in training data or models that may affect the fairness or neutrality of agent outputs.
⚠️ Example: If building an AI code reviewer, avoid suggesting insecure practices or biased hiring recommendations.
AI agents often generate convincing but inaccurate (hallucinated) information. This is a key safety concern, especially in domains like medicine, law, or finance.
- Retrieval-Augmented Generation (RAG): Always ground answers in factual documents.
- Source Referencing: Have agents cite sources for claims made.
- Confidence Scoring: Tag outputs with confidence levels or uncertainty estimates.
- Multi-agent Verification: Cross-check outputs using a secondary “fact-checker” agent.
from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS
from langchain.embeddings.openai import OpenAIEmbeddings
retriever = FAISS.load_local("my_vector_db", OpenAIEmbeddings()).as_retriever()
qa = RetrievalQA.from_chain_type(llm=chat_model, retriever=retriever)
response = qa.run("What is the latest paper on transformer agents?")
print(response)response = agent.generate_response(query)
if response.confidence < 0.7:
print("⚠️ Low confidence - recommend human review.")primary = main_agent.run(task)
checker = verifier_agent.run(f"Fact-check this: {primary}")
final_output = f"Primary: {primary}\nVerified: {checker}"Human oversight is essential for controlling autonomous agents, especially in high-risk or dynamic environments.
- Approval Gates: Require human approval before executing sensitive actions (e.g., sending emails, making API calls).
- Feedback Loops: Let users provide feedback to improve future agent behavior.
- Escalation Triggers: Detect when agents are unsure or entering unfamiliar contexts—and escalate to a human operator.
action = agent.plan_next_action()
if action.sensitive:
user_input = input(f"Agent wants to do: {action.description}. Approve? (y/n): ")
if user_input.lower() != "y":
print("Action aborted by user.")user_feedback = input("Was this helpful? (yes/no): ")
agent.log_feedback(user_feedback)| Principle | Recommendation |
|---|---|
| Transparency | Disclose AI use and avoid impersonation |
| Privacy | Handle personal data ethically and securely |
| Accuracy | Ground outputs in facts and verifiable sources |
| HITL | Keep a human supervisor in key decision loops |
| Bias Awareness | Monitor for biased behavior and retrain if needed |
📈 Comprehensive Evaluation Framework
- Task Success Rate: Percentage of successfully completed tasks
- Response Quality: Relevance, accuracy, helpfulness scores
- Efficiency: Time to completion, token usage
- User Satisfaction: Human feedback scores
- Safety: Harmful content detection, bias metrics
import json
from datetime import datetime
from typing import Dict, List, Any
class AgentEvaluator:
def __init__(self):
self.metrics = {
'task_success': [],
'response_quality': [],
'efficiency': [],
'safety': [],
'user_satisfaction': []
}
def evaluate_response(self,
query: str,
response: str,
expected_output: str = None,
user_feedback: int = None) -> Dict[str, float]:
"""Comprehensive response evaluation"""
evaluation = {}
# 1. Relevance Score (using embeddings)
evaluation['relevance'] = self.calculate_relevance(query, response)
# 2. Accuracy (if ground truth available)
if expected_output:
evaluation['accuracy'] = self.calculate_accuracy(response, expected_output)
# 3. Safety Score
evaluation['safety'] = self.evaluate_safety(response)
# 4. Coherence
evaluation['coherence'] = self.evaluate_coherence(response)
# 5. User Satisfaction (if available)
if user_feedback:
evaluation['user_satisfaction'] = user_feedback / 5.0 # Normalize to 0-1
return evaluation
def calculate_relevance(self, query: str, response: str) -> float:
"""Calculate semantic relevance between query and response"""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
query_emb = model.encode([query])
response_emb = model.encode([response])
similarity = torch.cosine_similarity(
torch.tensor(query_emb),
torch.tensor(response_emb)
).item()
return max(0, similarity) # Ensure non-negative
def calculate_accuracy(self, response: str, expected: str) -> float:
"""Calculate accuracy against expected output"""
# Simple token overlap for demonstration
response_tokens = set(response.lower().split())
expected_tokens = set(expected.lower().split())
if not expected_tokens:
return 0.0
overlap = len(response_tokens.intersection(expected_tokens))
return overlap / len(expected_tokens)
def evaluate_safety(self, response: str) -> float:
"""Evaluate response safety (placeholder implementation)"""
# In practice, use models like OpenAI's moderation API
harmful_keywords = ['hate', 'violence', 'illegal', 'harmful']
response_lower = response.lower()
harmful_count = sum(1 for keyword in harmful_keywords if keyword in response_lower)
safety_score = max(0, 1 - (harmful_count * 0.2))
return safety_score
def evaluate_coherence(self, response: str) -> float:
"""Evaluate response coherence and structure"""
# Simple heuristics (in practice, use more sophisticated methods)
sentences = response.split('.')
# Check for reasonable sentence length
avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences)
length_score = min(1.0, avg_sentence_length / 15) # Normalize around 15 words
# Check for proper punctuation
punct_score = min(1.0, response.count('.') / max(1, len(sentences)))
return (length_score + punct_score) / 2
def track_efficiency(self, start_time: datetime, end_time: datetime,
tokens_used: int, task_complexity: str):
"""Track efficiency metrics"""
duration = (end_time - start_time).total_seconds()
efficiency_metrics = {
'duration': duration,
'tokens_per_second': tokens_used / max(duration, 1),
'complexity': task_complexity,
'timestamp': datetime.now()
}
self.metrics['efficiency'].append(efficiency_metrics)
def generate_report(self) -> Dict[str, Any]:
"""Generate comprehensive evaluation report"""
if not any(self.metrics.values()):
return {"error": "No evaluation data available"}
report = {
'summary': {},
'trends': {},
'recommendations': []
}
# Calculate averages
for metric_type, values in self.metrics.items():
if values:
if metric_type == 'efficiency':
report['summary'][metric_type] = {
'avg_duration': sum(v['duration'] for v in values) / len(values),
'avg_tokens_per_second': sum(v['tokens_per_second'] for v in values) / len(values)
}
else:
report['summary'][metric_type] = {
'average': sum(values) / len(values),
'min': min(values),
'max': max(values),
'count': len(values)
}
# Generate recommendations
if 'response_quality' in report['summary']:
avg_quality = report['summary']['response_quality']['average']
if avg_quality < 0.7:
report['recommendations'].append("Consider improving response quality through better prompting or fine-tuning")
return report
# Usage Example
evaluator = AgentEvaluator()
# Evaluate a response
evaluation = evaluator.evaluate_response(
query="What is machine learning?",
response="Machine learning is a subset of AI that enables computers to learn from data without explicit programming.",
user_feedback=4
)
print("Evaluation Results:", evaluation)class AgentABTester:
def __init__(self, agent_a, agent_b):
self.agent_a = agent_a
self.agent_b = agent_b
self.results = {'a': [], 'b': []}
def run_test(self, test_queries, num_iterations=100):
"""Run A/B test between two agents"""
import random
for _ in range(num_iterations):
query = random.choice(test_queries)
# Randomly assign to agent A or B
if random.random() < 0.5:
response = self.agent_a.process(query)
self.results['a'].append({
'query': query,
'response': response,
'timestamp': datetime.now()
})
else:
response = self.agent_b.process(query)
self.results['b'].append({
'query': query,
'response': response,
'timestamp': datetime.now()
})
def analyze_results(self):
"""Statistical analysis of A/B test results"""
# Implementation would include statistical significance testing
return {
'agent_a_performance': len(self.results['a']),
'agent_b_performance': len(self.results['b']),
'statistical_significance': 'p < 0.05' # Placeholder
}🤖 Build Your Own AI Assistant (Click to expand)
Create a personal AI assistant that can:
- Answer questions using RAG
- Manage calendar and tasks
- Send emails and messages
- Browse the web for information
- Learn from user preferences
import openai
import sqlite3
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
import requests
from typing import List, Dict, Any
class PersonalAIAssistant:
def __init__(self, openai_api_key: str):
self.openai_client = openai.OpenAI(api_key=openai_api_key)
self.db_connection = sqlite3.connect('assistant.db')
self.setup_database()
self.tools = self.initialize_tools()
self.conversation_history = []
def setup_database(self):
"""Initialize SQLite database for storing information"""
cursor = self.db_connection.cursor()
# Tasks table
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
due_date TEXT,
completed BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# User preferences
cursor.execute('''
CREATE TABLE IF NOT EXISTS preferences (
key TEXT PRIMARY KEY,
value TEXT
)
''')
# Conversation memory
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY,
query TEXT,
response TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.db_connection.commit()
def initialize_tools(self) -> List[Dict[str, Any]]:
"""Define available tools/functions"""
return [
{
"type": "function",
"function": {
"name": "add_task",
"description": "Add a new task to the user's task list",
"parameters": {
"type": "object",
"properties": {
"title": {"type": "string"},
"description": {"type": "string"},
"due_date": {"type": "string", "format": "date"}
},
"required": ["title"]
}
}
},
{
"type": "function",
"function": {
"name": "get_tasks",
"description": "Retrieve user's tasks, optionally filtered by completion status",
"parameters": {
"type": "object",
"properties": {
"completed": {"type": "boolean"}
}
}
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for current information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "send_email",
"description": "Send an email to a specified recipient",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string"},
"subject": {"type": "string"},
"body": {"type": "string"}
},
"required": ["to", "subject", "body"]
}
}
},
{
"type": "function",
"function": {
"name": "set_preference",
"description": "Store a user preference",
"parameters": {
"type": "object",
"properties": {
"key": {"type": "string"},
"value": {"type": "string"}
},
"required": ["key", "value"]
}
}
}
]
def add_task(self, title: str, description: str = "", due_date: str = None):
"""Add a new task"""
cursor = self.db_connection.cursor()
cursor.execute(
"INSERT INTO tasks (title, description, due_date) VALUES (?, ?, ?)",
(title, description, due_date)
)
self.db_connection.commit()
return f"Task '{title}' added successfully!"
def get_tasks(self, completed: bool = None):
"""Retrieve tasks"""
cursor = self.db_connection.cursor()
if completed is None:
cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC")
else:
cursor.execute(
"SELECT * FROM tasks WHERE completed = ? ORDER BY created_at DESC",
(completed,)
)
tasks = cursor.fetchall()
return [
{
'id': task[0],
'title': task[1],
'description': task[2],
'due_date': task[3],
'completed': task[4],
'created_at': task[5]
}
for task in tasks
]
def web_search(self, query: str):
"""Search the web (placeholder - would use actual search API)"""
# In practice, integrate with Google Custom Search API, Bing API, etc.
return f"Here are the search results for '{query}': [Placeholder results]"
def send_email(self, to: str, subject: str, body: str):
"""Send email (placeholder implementation)"""
# In practice, configure with actual SMTP settings
print(f"Email sent to {to}")
print(f"Subject: {subject}")
print(f"Body: {body}")
return "Email sent successfully!"
def set_preference(self, key: str, value: str):
"""Store user preference"""
cursor = self.db_connection.cursor()
cursor.execute(
"INSERT OR REPLACE INTO preferences (key, value) VALUES (?, ?)",
(key, value)
)
self.db_connection.commit()
return f"Preference {key} set to {value}"
def execute_function(self, function_name: str, arguments: Dict[str, Any]):
"""Execute a function based on name and arguments"""
function_map = {
'add_task': self.add_task,
'get_tasks': self.get_tasks,
'web_search': self.web_search,
'send_email': self.send_email,
'set_preference': self.set_preference
}
if function_name in function_map:
return function_map[function_name](**arguments)
else:
return f"Unknown function: {function_name}"
def process_query(self, query: str) -> str:
"""Process user query and return response"""
# Add to conversation history
self.conversation_history.append({"role": "user", "content": query})
# Get response from OpenAI with function calling
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": """You are a helpful personal AI assistant. You can:
1. Manage tasks and to-do lists
2. Search the web for information
3. Send emails
4. Store and recall user preferences
5. Have natural conversations
Always be helpful, concise, and proactive in suggesting useful actions."""
},
*self.conversation_history
],
tools=self.tools,
tool_choice="auto"
)
message = response.choices[0].message
# Handle function calls
if message.tool_calls:
# Execute function calls
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
result = self.execute_function(function_name, function_args)
# Add function call and result to conversation
self.conversation_history.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})
self.conversation_history.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
# Get final response with function results
final_response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=self.conversation_history
)
assistant_message = final_response.choices[0].message.content
else:
assistant_message = message.content
# Add assistant response to history
self.conversation_history.append({"role": "assistant", "content": assistant_message})
# Store conversation in database
cursor = self.db_connection.cursor()
cursor.execute(
"INSERT INTO conversations (query, response) VALUES (?, ?)",
(query, assistant_message)
)
self.db_connection.commit()
return assistant_message
# Usage Example
if __name__ == "__main__":
assistant = PersonalAIAssistant("your-openai-api-key")
# Interactive loop
print("Personal AI Assistant ready! Type 'quit' to exit.")
while True:
user_input = input("\nYou: ")
if user_input.lower() == 'quit':
break
response = assistant.process_query(user_input)
print(f"Assistant: {response}")- Voice Interface: Add speech-to-text and text-to-speech
- Calendar Integration: Connect with Google Calendar API
- Smart Notifications: Proactive reminders and suggestions
- Learning System: Adapt responses based on user feedback
- Mobile App: Create a companion mobile application
🔬 Automated Research Pipeline (Click to expand)
Build a team of specialized agents that can:
- Conduct comprehensive research on any topic
- Synthesize information from multiple sources
- Generate detailed reports with citations
- Fact-check and validate information
- Present findings in various formats
from dataclasses import dataclass
from typing import List, Dict, Any
import json
import requests
from datetime import datetime
import asyncio
@dataclass
class ResearchArticle:
title: str
content: str
source: str
url: str
date_published: str
credibility_score: float
class ResearcherAgent:
def __init__(self, name: str, specialty: str, tools: List[str]):
self.name = name
self.specialty = specialty
self.tools = tools
self.findings = []
async def research_topic(self, topic: str, depth: str = "comprehensive") -> List[ResearchArticle]:
"""Conduct research on a given topic"""
print(f"{self.name} researching: {topic}")
# Simulate different research strategies based on specialty
if self.specialty == "academic":
return await self._academic_search(topic, depth)
elif self.specialty == "web_research":
return await self._web_search(topic, depth)
elif self.specialty == "data_analysis":
return await self._data_research(topic, depth)
else:
return await self._general_research(topic, depth)
async def _academic_search(self, topic: str, depth: str) -> List[ResearchArticle]:
"""Search academic databases and papers"""
# In practice, integrate with ArXiv, PubMed, Google Scholar APIs
papers = [
ResearchArticle(
title=f"Academic Study on {topic}",
content=f"Comprehensive academic analysis of {topic} with peer-reviewed findings...",
source="Academic Journal",
url="https://example.com/paper1",
date_published="2024-01-15",
credibility_score=0.95
)
]
return papers
async def _web_search(self, topic: str, depth: str) -> List[ResearchArticle]:
"""Search web sources and news"""
# Integrate with search APIs (Google, Bing, DuckDuckGo)
articles = [
ResearchArticle(
title=f"Recent Developments in {topic}",
content=f"Latest news and developments about {topic}...",
source="Tech News Site",
url="https://example.com/article1",
date_published="2024-02-20",
credibility_score=0.75
)
]
return articles
async def _data_research(self, topic: str, depth: str) -> List[ResearchArticle]:
"""Analyze data sources and statistics"""
datasets = [
ResearchArticle(
title=f"Statistical Analysis of {topic}",
content=f"Data-driven insights about {topic} based on recent datasets...",
source="Data Repository",
url="https://example.com/dataset1",
date_published="2024-02-01",
credibility_score=0.88
)
]
return datasets
async def _general_research(self, topic: str, depth: str) -> List[ResearchArticle]:
"""General purpose research"""
return [
ResearchArticle(
title=f"Overview of {topic}",
content=f"General information about {topic}...",
source="Encyclopedia",
url="https://example.com/overview",
date_published="2024-01-01",
credibility_score=0.80
)
]
class AnalystAgent:
def __init__(self, name: str):
self.name = name
def analyze_sources(self, articles: List[ResearchArticle]) -> Dict[str, Any]:
"""Analyze research sources for credibility and relevance"""
print(f"{self.name} analyzing {len(articles)} sources...")
analysis = {
'total_sources': len(articles),
'avg_credibility': sum(a.credibility_score for a in articles) / len(articles),
'source_types': {},
'date_range': {
'earliest': min(a.date_published for a in articles),
'latest': max(a.date_published for a in articles)
},
'recommendations': []
}
# Analyze source types
for article in articles:
source_type = article.source
analysis['source_types'][source_type] = analysis['source_types'].get(source_type, 0) + 1
# Generate recommendations
if analysis['avg_credibility'] < 0.7:
analysis['recommendations'].append("Consider finding more credible sources")
if len(set(a.source for a in articles)) < 3:
analysis['recommendations'].append("Diversify source types for better coverage")
return analysis
def synthesize_findings(self, articles: List[ResearchArticle]) -> str:
"""Synthesize information from multiple sources"""
print(f"{self.name} synthesizing findings...")
# Group articles by theme (simplified)
themes = {}
for article in articles:
# Simple keyword-based grouping (in practice, use more sophisticated NLP)
key_terms = article.title.split()[:3] # First 3 words as theme
theme = ' '.join(key_terms)
if theme not in themes:
themes[theme] = []
themes[theme].append(article)
# Generate synthesis
synthesis = "# Research Synthesis\n\n"
for theme, theme_articles in themes.items():
synthesis += f"## {theme}\n\n"
for article in theme_articles:
synthesis += f"- **{article.title}** ({article.source}): {article.content[:200]}...\n"
synthesis += "\n"
return synthesis
class WriterAgent:
def __init__(self, name: str):
self.name = name
def generate_report(self,
topic: str,
synthesis: str,
analysis: Dict[str, Any],
articles: List[ResearchArticle]) -> str:
"""Generate comprehensive research report"""
print(f"{self.name} writing report on {topic}...")
report = f"""# Research Report: {topic}
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## Executive Summary
This report presents comprehensive research findings on {topic}, synthesizing information from {analysis['total_sources']} sources with an average credibility score of {analysis['avg_credibility']:.2f}.
## Methodology
- **Sources Analyzed**: {analysis['total_sources']}
- **Source Types**: {', '.join(analysis['source_types'].keys())}
- **Date Range**: {analysis['date_range']['earliest']} to {analysis['date_range']['latest']}
- **Credibility Assessment**: Average score of {analysis['avg_credibility']:.2f}/1.0
## Key Findings
{synthesis}
## Source Analysis
{self._format_source_analysis(analysis)}
## Recommendations
{"".join(f"- {rec}\n" for rec in analysis['recommendations']) if analysis['recommendations'] else "No specific recommendations at this time."}
## References
{self._format_references(articles)}
---
*This report was generated by an AI research team and should be reviewed by human experts.*
"""
return report
def _format_source_analysis(self, analysis: Dict[str, Any]) -> str:
"""Format source analysis section"""
source_breakdown = ""
for source_type, count in analysis['source_types'].items():
percentage = (count / analysis['total_sources']) * 100
source_breakdown += f"- **{source_type}**: {count} sources ({percentage:.1f}%)\n"
return f"""
### Source Distribution
{source_breakdown}
### Quality Metrics
- Average Credibility Score: {analysis['avg_credibility']:.2f}/1.0
- Total Sources: {analysis['total_sources']}
- Date Coverage: {analysis['date_range']['earliest']} to {analysis['date_range']['latest']}
"""
def _format_references(self, articles: List[ResearchArticle]) -> str:
"""Format references in academic style"""
references = ""
for i, article in enumerate(articles, 1):
references += f"{i}. {article.title}. *{article.source}*. {article.date_published}. Available at: {article.url}\n"
return references
class FactCheckerAgent:
def __init__(self, name: str):
self.name = name
def verify_claims(self, report: str, articles: List[ResearchArticle]) -> Dict[str, Any]:
"""Verify factual claims in the report"""
print(f"{self.name} fact-checking report...")
# Extract claims (simplified - in practice, use NLP to extract factual statements)
claims = self._extract_claims(report)
verification_results = {
'total_claims': len(claims),
'verified_claims': [],
'unverified_claims': [],
'confidence_score': 0.0
}
for claim in claims:
verification = self._verify_single_claim(claim, articles)
if verification['verified']:
verification_results['verified_claims'].append(verification)
else:
verification_results['unverified_claims'].append(verification)
# Calculate overall confidence
if verification_results['total_claims'] > 0:
verified_count = len(verification_results['verified_claims'])
verification_results['confidence_score'] = verified_count / verification_results['total_claims']
return verification_results
def _extract_claims(self, report: str) -> List[str]:
"""Extract factual claims from report"""
# Simplified extraction - look for sentences with specific patterns
sentences = report.split('.')
claims = []
# Look for sentences that might contain factual claims
fact_indicators = ['shows that', 'indicates that', 'according to', 'research found', 'study reveals']
for sentence in sentences:
sentence = sentence.strip()
if any(indicator in sentence.lower() for indicator in fact_indicators):
claims.append(sentence)
return claims[:10] # Limit to 10 claims for demo
def _verify_single_claim(self, claim: str, articles: List[ResearchArticle]) -> Dict[str, Any]:
"""Verify a single claim against source articles"""
# Simple verification based on keyword matching
# In practice, use more sophisticated semantic matching
verification = {
'claim': claim,
'verified': False,
'supporting_sources': [],
'confidence': 0.0
}
claim_words = set(claim.lower().split())
for article in articles:
article_words = set(article.content.lower().split())
overlap = len(claim_words.intersection(article_words))
overlap_ratio = overlap / len(claim_words) if claim_words else 0
if overlap_ratio > 0.3: # 30% word overlap threshold
verification['supporting_sources'].append({
'source': article.source,
'title': article.title,
'overlap_ratio': overlap_ratio
})
if verification['supporting_sources']:
verification['verified'] = True
verification['confidence'] = max(source['overlap_ratio'] for source in verification['supporting_sources'])
return verification
class ResearchTeamOrchestrator:
def __init__(self):
self.researchers = [
ResearcherAgent("Dr. Academic", "academic", ["arxiv", "pubmed", "google_scholar"]),
ResearcherAgent("Web Explorer", "web_research", ["google", "bing", "news_api"]),
ResearcherAgent("Data Analyst", "data_analysis", ["kaggle", "data_gov", "world_bank"])
]
self.analyst = AnalystAgent("Chief Analyst")
self.writer = WriterAgent("Report Writer")
self.fact_checker = FactCheckerAgent("Fact Checker")
async def conduct_research(self, topic: str, depth: str = "comprehensive") -> Dict[str, Any]:
"""Orchestrate complete research process"""
print(f"🔬 Starting research on: {topic}")
print("=" * 50)
# Phase 1: Parallel research by all agents
print("📚 Phase 1: Conducting research...")
research_tasks = [
researcher.research_topic(topic, depth)
for researcher in self.researchers
]
research_results = await asyncio.gather(*research_tasks)
# Combine all articles
all_articles = []
for articles in research_results:
all_articles.extend(articles)
print(f"✅ Collected {len(all_articles)} sources")
# Phase 2: Analysis
print("\n🔍 Phase 2: Analyzing sources...")
analysis = self.analyst.analyze_sources(all_articles)
synthesis = self.analyst.synthesize_findings(all_articles)
# Phase 3: Report generation
print("\n📝 Phase 3: Writing report...")
report = self.writer.generate_report(topic, synthesis, analysis, all_articles)
# Phase 4: Fact checking
print("\n✓ Phase 4: Fact-checking...")
fact_check_results = self.fact_checker.verify_claims(report, all_articles)
print("🎉 Research complete!")
print("=" * 50)
return {
'topic': topic,
'report': report,
'articles': all_articles,
'analysis': analysis,
'fact_check': fact_check_results,
'metadata': {
'total_sources': len(all_articles),
'research_date': datetime.now().isoformat(),
'confidence_score': fact_check_results['confidence_score']
}
}
def export_results(self, results: Dict[str, Any], format: str = "markdown"):
"""Export research results in various formats"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"research_report_{results['topic'].replace(' ', '_')}_{timestamp}"
if format == "markdown":
with open(f"{filename}.md", "w", encoding="utf-8") as f:
f.write(results['report'])
print(f"📄 Report exported to {filename}.md")
elif format == "json":
# Export raw data
export_data = {
'topic': results['topic'],
'report': results['report'],
'sources': [
{
'title': article.title,
'source': article.source,
'url': article.url,
'credibility_score': article.credibility_score
}
for article in results['articles']
],
'analysis': results['analysis'],
'fact_check': results['fact_check'],
'metadata': results['metadata']
}
with open(f"{filename}.json", "w", encoding="utf-8") as f:
json.dump(export_data, f, indent=2)
print(f"📊 Data exported to {filename}.json")
# Usage Example
async def main():
# Initialize research team
research_team = ResearchTeamOrchestrator()
# Conduct research
results = await research_team.conduct_research(
topic="Large Language Models in Healthcare",
depth="comprehensive"
)
# Display summary
print(f"\n📋 RESEARCH SUMMARY")
print(f"Topic: {results['topic']}")
print(f"Sources Found: {results['metadata']['total_sources']}")
print(f"Confidence Score: {results['metadata']['confidence_score']:.2f}")
print(f"Verified Claims: {len(results['fact_check']['verified_claims'])}")
# Export results
research_team.export_results(results, format="markdown")
research_team.export_results(results, format="json")
return results
# Run the research
if __name__ == "__main__":
results = asyncio.run(main())- Real API Integration: Connect to actual search APIs (Google, Bing, ArXiv)
- Advanced NLP: Use transformer models for better claim extraction and verification
- Collaborative Filtering: Cross-verify information between multiple agents
- Visual Reports: Generate charts and graphs from data
- Continuous Learning: Improve research strategies based on feedback
💻 AI-Powered Development Team (Click to expand)
Create an AI development team that can:
- Review code for bugs, security issues, and best practices
- Generate documentation automatically
- Suggest improvements and refactoring
- Write unit tests
- Perform security audits
import ast
import re
import subprocess
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from pathlib import Path
import json
@dataclass
class CodeIssue:
type: str # 'bug', 'security', 'performance', 'style'
severity: str # 'high', 'medium', 'low'
line_number: int
description: str
suggestion: str
code_snippet: str
@dataclass
class CodeAnalysis:
file_path: str
issues: List[CodeIssue]
complexity_score: float
maintainability_score: float
test_coverage: float
suggestions: List[str]
class CodeReviewAgent:
def __init__(self, name: str, specialty: str):
self.name = name
self.specialty = specialty
self.rules = self._load_review_rules()
def _load_review_rules(self) -> Dict[str, List[str]]:
"""Load code review rules based on specialty"""
rules = {
'security': [
'SQL injection vulnerabilities',
'XSS vulnerabilities',
'Hardcoded credentials',
'Insecure random number generation',
'Unvalidated input'
],
'performance': [
'Inefficient loops',
'Memory leaks',
'Unnecessary database queries',
'Large object creation in loops',
'Blocking operations on main thread'
],
'style': [
'PEP 8 compliance',
'Naming conventions',
'Function length',
'Code duplication',
'Missing docstrings'
],
'bugs': [
'Null pointer exceptions',
'Index out of bounds',
'Type mismatches',
'Logic errors',
'Resource leaks'
]
}
return rules.get(self.specialty, [])
def review_code(self, file_path: str, code_content: str) -> CodeAnalysis:
"""Perform code review based on specialty"""
print(f"{self.name} reviewing {file_path}...")
issues = []
if self.specialty == 'security':
issues.extend(self._check_security_issues(code_content))
elif self.specialty == 'performance':
issues.extend(self._check_performance_issues(code_content))
elif self.specialty == 'style':
issues.extend(self._check_style_issues(code_content))
elif self.specialty == 'bugs':
issues.extend(self._check_bug_patterns(code_content))
# Calculate metrics
complexity = self._calculate_complexity(code_content)
maintainability = self._calculate_maintainability(code_content)
# Generate suggestions
suggestions = self._generate_suggestions(issues, code_content)
return CodeAnalysis(
file_path=file_path,
issues=issues,
complexity_score=complexity,
maintainability_score=maintainability,
test_coverage=0.0, # Would be calculated from test runner
suggestions=suggestions
)
def _check_security_issues(self, code: str) -> List[CodeIssue]:
"""Check for security vulnerabilities"""
issues = []
lines = code.split('\n')
for i, line in enumerate(lines, 1):
line_lower = line.lower().strip()
# Check for hardcoded credentials
if any(pattern in line_lower for pattern in ['password =', 'api_key =', 'secret =']):
if not line_lower.startswith('#'): # Ignore comments
issues.append(CodeIssue(
type='security',
severity='high',
line_number=i,
description='Potential hardcoded credential detected',
suggestion='Store credentials in environment variables or secure vault',
code_snippet=line.strip()
))
# Check for SQL injection patterns
if 'execute(' in line_lower and '+' in line and ('select' in line_lower or 'insert' in line_lower):
issues.append(CodeIssue(
type='security',
severity='high',
line_number=i,
description='Potential SQL injection vulnerability',
suggestion='Use parameterized queries or prepared statements',
code_snippet=line.strip()
))
# Check for eval() usage
if 'eval(' in line_lower:
issues.append(CodeIssue(
type='security',
severity='medium',
description='Use of eval() detected - potential code injection',
suggestion='Avoid eval() or use safer alternatives like ast.literal_eval()',
line_number=i,
code_snippet=line.strip()
))
return issues
def _check_performance_issues(self, code: str) -> List[CodeIssue]:
"""Check for performance problems"""
issues = []
lines = code.split('\n')
for i, line in enumerate(lines, 1):
line_stripped = line.strip()
# Check for inefficient string concatenation in loops
if 'for ' in line_stripped and i + 1 < len(lines):
next_line = lines[i].strip() if i < len(lines) else ""
if '+=' in next_line and any(quote in next_line for quote in ['"', "'"]):
issues.append(CodeIssue(
type='performance',
severity='medium',
line_number=i + 1,
description='Inefficient string concatenation in loop',
suggestion='Use list.append() and join(), or use f-strings',
code_snippet=f"{line_stripped}\n{next_line}"
))
# Check for inefficient list operations
if '.append(' in line_stripped and 'for ' in line_stripped:
issues.append(CodeIssue(
type='performance',
severity='low',
line_number=i,
description='Consider list comprehension for better performance',
suggestion='Use list comprehension instead of append in loop',
code_snippet=line_stripped
))
return issues
def _check_style_issues(self, code: str) -> List[CodeIssue]:
"""Check for style and formatting issues"""
issues = []
lines = code.split('\n')
for i, line in enumerate(lines, 1):
# Check line length
if len(line) > 100:
issues.append(CodeIssue(
type='style',
severity='low',
line_number=i,
description='Line too long (>100 characters)',
suggestion='Break long lines for better readability',
code_snippet=line[:50] + "..."
))
# Check for missing docstrings in functions
if line.strip().startswith('def ') and ':' in line:
# Look for docstring in next few lines
has_docstring = False
for j in range(i, min(i + 3, len(lines))):
if '"""' in lines[j] or "'''" in lines[j]:
has_docstring = True
break
if not has_docstring:
issues.append(CodeIssue(
type='style',
severity='medium',
line_number=i,
description='Function missing docstring',
suggestion='Add docstring describing function purpose and parameters',
code_snippet=line.strip()
))
return issues
def _check_bug_patterns(self, code: str) -> List[CodeIssue]:
"""Check for common bug patterns"""
issues = []
lines = code.split('\n')
for i, line in enumerate(lines, 1):
line_stripped = line.strip()
# Check for potential division by zero
if '/' in line_stripped and not '//' in line_stripped:
# Simple check for obvious patterns
if any(var in line_stripped for var in ['/0', '/ 0']):
issues.append(CodeIssue(
type='bugs',
severity='high',
line_number=i,
description='Potential division by zero',
suggestion='Add zero check before division',
code_snippet=line_stripped
))
# Check for potential index errors
if '[' in line_stripped and ']' in line_stripped:
if any(pattern in line_stripped for pattern in ['[-1]', '[len(']):
issues.append(CodeIssue(
type='bugs',
severity='medium',
line_number=i,
description='Potential index out of bounds',
suggestion='Validate index bounds before access',
code_snippet=line_stripped
))
return issues
def _calculate_complexity(self, code: str) -> float:
"""Calculate cyclomatic complexity"""
try:
tree = ast.parse(code)
complexity = 1 # Base complexity
for node in ast.walk(tree):
if isinstance(node, (ast.If, ast.For, ast.While, ast.Try)):
complexity += 1
elif isinstance(node, ast.BoolOp):
complexity += len(node.values) - 1
return min(complexity / 10.0, 1.0) # Normalize to 0-1
except:
return 0.5 # Default if parsing fails
def _calculate_maintainability(self, code: str) -> float:
"""Calculate maintainability score"""
lines = code.split('\n')
non_empty_lines = [line for line in lines if line.strip()]
if not non_empty_lines:
return 1.0
# Simple metrics
avg_line_length = sum(len(line) for line in non_empty_lines) / len(non_empty_lines)
comment_ratio = sum(1 for line in non_empty_lines if line.strip().startswith('#')) / len(non_empty_lines)
# Score based on readability factors
length_score = max(0, 1 - (avg_line_length - 50) / 100) # Penalty for long lines
comment_score = min(comment_ratio * 2, 0.3) # Bonus for comments, capped at 0.3
return max(0, min(1.0, length_score + comment_score))
def _generate_suggestions(self, issues: List[CodeIssue], code: str) -> List[str]:
"""Generate general improvement suggestions"""
suggestions = []
high_severity_count = sum(1 for issue in issues if issue.severity == 'high')
if high_severity_count > 0:
suggestions.append(f"Address {high_severity_count} high-severity issues immediately")
security_issues = [issue for issue in issues if issue.type == 'security']
if security_issues:
suggestions.append("Consider security audit and penetration testing")
if len(code.split('\n')) > 100:
suggestions.append("Consider breaking large file into smaller modules")
return suggestions
class DocumentationAgent:
def __init__(self, name: str):
self.name = name
def generate_documentation(self, file_path: str, code_content: str) -> str:
"""Generate documentation for code"""
print(f"{self.name} generating documentation for {file_path}...")
try:
tree = ast.parse(code_content)
except SyntaxError:
return "# Documentation Generation Failed\nSyntax error in source code."
docs = f"# Documentation for {Path(file_path).name}\n\n"
# Extract classes and functions
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
docs += self._document_function(node)
elif isinstance(node, ast.ClassDef):
docs += self._document_class(node)
return docs
def _document_function(self, node: ast.FunctionDef) -> str:
"""Generate documentation for a function"""
doc = f"## Function: `{node.name}`\n\n"
# Extract docstring if exists
if (ast.get_docstring(node)):
doc += f"**Description:** {ast.get_docstring(node)}\n\n"
# Parameters
if node.args.args:
doc += "**Parameters:**\n"
for arg in node.args.args:
doc += f"- `{arg.arg}`: Description needed\n"
doc += "\n"
# Return type (if annotated)
if node.returns:
doc += f"**Returns:** {ast.unparse(node.returns)}\n\n"
doc += "---\n\n"
return doc
def _document_class(self, node: ast.ClassDef) -> str:
"""Generate documentation for a class"""
doc = f"## Class: `{node.name}`\n\n"
if ast.get_docstring(node):
doc += f"**Description:** {ast.get_docstring(node)}\n\n"
# Methods
methods = [n for n in node.body if isinstance(n, ast.FunctionDef)]
if methods:
doc += "**Methods:**\n"
for method in methods:
doc += f"- `{method.name}()`: {ast.get_docstring(method) or 'Description needed'}\n"
doc += "\n"
doc += "---\n\n"
return doc
class TestGenerationAgent:
def __init__(self, name: str):
self.name = name
def generate_tests(self, file_path: str, code_content: str) -> str:
"""Generate unit tests for code"""
print(f"{self.name} generating tests for {file_path}...")
try:
tree = ast.parse(code_content)
except SyntaxError:
return "# Test generation failed due to syntax error"
test_code = f"""# Tests for {Path(file_path).name}
import unittest
from unittest.mock import Mock, patch
import sys
import os
# Add the source directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
"""
# Find testable functions
functions = [node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]
classes = [node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]
if functions:
test_code += self._generate_function_tests(functions)
if classes:
test_code += self._generate_class_tests(classes)
test_code += """
if __name__ == '__main__':
unittest.main()
"""
return test_code
def _generate_function_tests(self, functions: List[ast.FunctionDef]) -> str:
"""Generate tests for standalone functions"""
test_code = "\nclass TestFunctions(unittest.TestCase):\n\n"
for func in functions:
if not func.name.startswith('_'): # Skip private functions
test_code += f""" def test_{func.name}(self):
\"\"\"Test {func.name} function\"\"\"
# TODO: Implement test cases
# Test normal case
# Test edge cases
# Test error cases
pass
"""
return test_code
def _generate_class_tests(self, classes: List[ast.ClassDef]) -> str:
"""Generate tests for classes"""
test_code = ""
for cls in classes:
test_code += f"\nclass Test{cls.name}(unittest.TestCase):\n\n"
test_code += f""" def setUp(self):
\"\"\"Set up test fixtures\"\"\"
self.{cls.name.lower()} = {cls.name}()
"""
# Generate tests for methods
methods = [node for node in cls.body if isinstance(node, ast.FunctionDef)]
for method in methods:
if not method.name.startswith('_') or method.name == '__init__':
test_code += f""" def test_{method.name}(self):
\"\"\"Test {cls.name}.{method.name}\"\"\"
# TODO: Implement test for {method.name}
pass
"""
return test_code
class DevelopmentTeamOrchestrator:
def __init__(self):
self.code_reviewers = [
CodeReviewAgent("Security Expert", "security"),
CodeReviewAgent("Performance Analyst", "performance"),
CodeReviewAgent("Style Checker", "style"),
CodeReviewAgent("Bug Hunter", "bugs")
]
self.doc_generator = DocumentationAgent("Doc Writer")
self.test_generator = TestGenerationAgent("Test Engineer")
def review_codebase(self, directory_path: str) -> Dict[str, Any]:
"""Review entire codebase"""
print(f"🔍 Reviewing codebase: {directory_path}")
print("=" * 50)
results = {
'reviews': {},
'documentation': {},
'tests': {},
'summary': {
'total_files': 0,
'total_issues': 0,
'high_severity_issues': 0,
'average_complexity': 0.0,
'average_maintainability': 0.0
}
}
# Find Python files
python_files = list(Path(directory_path).glob("**/*.py"))
results['summary']['total_files'] = len(python_files)
complexities = []
maintainabilities = []
for file_path in python_files:
if file_path.name.startswith('.'):
continue
print(f"\n📁 Processing {file_path.name}...")
try:
code_content = file_path.read_text(encoding='utf-8')
except Exception as e:
print(f"❌ Error reading {file_path}: {e}")
continue
# Code reviews
file_reviews = {}
all_issues = []
for reviewer in self.code_reviewers:
analysis = reviewer.review_code(str(file_path), code_content)
file_reviews[reviewer.specialty] = analysis
all_issues.extend(analysis.issues)
complexities.append(analysis.complexity_score)
maintainabilities.append(analysis.maintainability_score)
results['reviews'][str(file_path)] = {
'analyses': file_reviews,
'total_issues': len(all_issues),
'high_severity_issues': len([i for i in all_issues if i.severity == 'high'])
}
# Generate documentation
documentation = self.doc_generator.generate_documentation(str(file_path), code_content)
results['documentation'][str(file_path)] = documentation
# Generate tests
tests = self.test_generator.generate_tests(str(file_path), code_content)
results['tests'][str(file_path)] = tests
results['summary']['total_issues'] += len(all_issues)
results['summary']['high_severity_issues'] += len([i for i in all_issues if i.severity == 'high'])
# Calculate averages
if complexities:
results['summary']['average_complexity'] = sum(complexities) / len(complexities)
if maintainabilities:
results['summary']['average_maintainability'] = sum(maintainabilities) / len(maintainabilities)
self._generate_summary_report(results)
return results
def _generate_summary_report(self, results: Dict[str, Any]):
"""Generate and save summary report"""
summary = results['summary']
report = f"""# Code Review Summary Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## Overview
- **Total Files Reviewed**: {summary['total_files']}
- **Total Issues Found**: {summary['total_issues']}
- **High Severity Issues**: {summary['high_severity_issues']}
- **Average Complexity Score**: {summary['average_complexity']:.2f}/1.0
- **Average Maintainability Score**: {summary['average_maintainability']:.2f}/1.0
## Recommendations
"""
if summary['high_severity_issues'] > 0:
report += f"⚠️ **URGENT**: Address {summary['high_severity_issues']} high-severity issues immediately\n"
if summary['average_complexity'] > 0.7:
report += "🔧 Consider refactoring complex functions to improve maintainability\n"
if summary['average_maintainability'] < 0.5:
report += "📚 Improve code documentation and add more comments\n"
report += "\n## Next Steps\n"
report += "1. Review and fix high-severity security issues\n"
report += "2. Run generated unit tests to ensure functionality\n"
report += "3. Use generated documentation as a starting point\n"
report += "4. Consider code refactoring for complex functions\n"
# Save report
with open("code_review_summary.md", "w", encoding="utf-8") as f:
f.write(report)
print(f"\n📋 Summary Report Generated: code_review_summary.md")
# Usage Example
def main():
# Initialize development team
dev_team = DevelopmentTeamOrchestrator()
# Review codebase (replace with actual path)
codebase_path = "./sample_project" # Your project path
results = dev_team.review_codebase(codebase_path)
# Display summary
summary = results['summary']
print(f"\n🎯 REVIEW COMPLETE")
print(f"Files: {summary['total_files']}")
print(f"Issues: {summary['total_issues']} ({summary['high_severity_issues']} high severity)")
print(f"Complexity: {summary['average_complexity']:.2f}/1.0")
print(f"Maintainability: {summary['average_maintainability']:.2f}/1.0")
# Export detailed results
with open("detailed_review_results.json", "w") as f:
# Convert results to JSON-serializable format
json_results = {
'summary': results['summary'],
'file_count': len(results['reviews']),
'documentation_generated': len(results['documentation']),
'tests_generated': len(results['tests'])
}
json.dump(json_results, f, indent=2)
return results
if __name__ == "__main__":
results = main()- Integration with IDEs: VS Code/PyCharm plugins
- CI/CD Integration: Automated reviews on pull requests
- Machine Learning: Train models on codebases for better issue detection
- Real-time Collaboration: Live code review sessions
- Language Support: Extend to JavaScript, Java, C++, etc.
🎮 RL-Powered Autonomous Agents
- Policy: Agent's strategy for choosing actions
- Value Functions: Estimate expected future rewards
- Q-Learning: Learn action-value functions
- Policy Gradients: Directly optimize policy parameters
- Actor-Critic: Combine value functions and policy optimization
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque, namedtuple
from typing import List, Tuple, Optional
# Experience replay for DQN
Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])
class DQNAgent:
def __init__(self, state_size: int, action_size: int, lr: float = 0.001):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=10000)
self.epsilon = 1.0 # Exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.learning_rate = lr
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Neural networks
self.q_network_local = self._build_network().to(self.device)
self.q_network_target = self._build_network().to(self.device)
self.optimizer = optim.Adam(self.q_network_local.parameters(), lr=lr)
# Update target network
self.update_target_network()
def _build_network(self) -> nn.Module:
"""Build the Q-network"""
return nn.Sequential(
nn.Linear(self.state_size, 64),
nn.ReLU(),
nn.Linear(64, 64),
nn.ReLU(),
nn.Linear(64, self.action_size)
)
def remember(self, state, action, reward, next_state, done):
"""Store experience in replay memory"""
self.memory.append(Experience(state, action, reward, next_state, done))
def act(self, state, training=True):
"""Choose action using epsilon-greedy policy"""
if training and random.random() <= self.epsilon:
return random.choice(range(self.action_size))
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
q_values = self.q_network_local(state_tensor)
return q_values.argmax().item()
def replay(self, batch_size=32):
"""Train the agent using experience replay"""
if len(self.memory) < batch_size:
return
# Sample random batch
batch = random.sample(self.memory, batch_size)
states = torch.FloatTensor([e.state for e in batch]).to(self.device)
actions = torch.LongTensor([e.action for e in batch]).to(self.device)
rewards = torch.FloatTensor([e.reward for e in batch]).to(self.device)
next_states = torch.FloatTensor([e.next_state for e in batch]).to(self.device)
dones = torch.BoolTensor([e.done for e in batch]).to(self.device)
# Current Q values
current_q_values = self.q_network_local(states).gather(1, actions.unsqueeze(1))
# Next Q values from target network
next_q_values = self.q_network_target(next_states).max(1)[0].detach()
target_q_values = rewards + (0.99 * next_q_values * (~dones))
# Compute loss
loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def update_target_network(self):
"""Copy weights from local to target network"""
self.q_network_target.load_state_dict(self.q_network_local.state_dict())
# Advanced RL Environment for Agent Training
class AgentEnvironment:
def __init__(self, task_complexity=0.5):
self.task_complexity = task_complexity
self.state_size = 10
self.action_size = 4 # [research, analyze, communicate, execute]
self.current_state = self.reset()
self.max_steps = 100
self.step_count = 0
def reset(self):
"""Reset environment to initial state"""
self.step_count = 0
self.current_state = np.random.normal(0, 1, self.state_size)
return self.current_state
def step(self, action):
"""Execute action and return next state, reward, done"""
self.step_count += 1
# Simulate state transition based on action
if action == 0: # Research
reward = self._research_reward()
elif action == 1: # Analyze
reward = self._analyze_reward()
elif action == 2: # Communicate
reward = self._communicate_reward()
else: # Execute
reward = self._execute_reward()
# Update state
self.current_state += np.random.normal(0, 0.1, self.state_size)
self.current_state = np.clip(self.current_state, -2, 2)
# Check if episode is done
done = self.step_count >= self.max_steps or reward > 0.9
return self.current_state, reward, done
def _research_reward(self):
"""Calculate reward for research action"""
# Higher reward for thorough research
return np.random.normal(0.3, 0.1)
def _analyze_reward(self):
"""Calculate reward for analysis action"""
return np.random.normal(0.4, 0.1)
def _communicate_reward(self):
"""Calculate reward for communication action"""
return np.random.normal(0.2, 0.1)
def _execute_reward(self):
"""Calculate reward for execution action"""
return np.random.normal(0.5, 0.2)🎨 Vision + Language Agents
import torch
from transformers import CLIPProcessor, CLIPModel, BlipProcessor, BlipForConditionalGeneration
from PIL import Image
import requests
import base64
from io import BytesIO
class MultimodalAgent:
def __init__(self):
# Vision-Language Models
self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
self.blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
self.blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.clip_model.to(self.device)
self.blip_model.to(self.device)
def analyze_image(self, image_path: str, query: str = None) -> dict:
"""Comprehensive image analysis"""
image = Image.open(image_path).convert('RGB')
results = {
'caption': self.generate_caption(image),
'objects': self.detect_objects(image),
'description': self.describe_image(image)
}
if query:
results['query_response'] = self.answer_about_image(image, query)
results['similarity_score'] = self.calculate_text_image_similarity(query, image)
return results
def generate_caption(self, image: Image.Image) -> str:
"""Generate descriptive caption for image"""
inputs = self.blip_processor(image, return_tensors="pt").to(self.device)
with torch.no_grad():
output = self.blip_model.generate(**inputs, max_length=50)
caption = self.blip_processor.decode(output[0], skip_special_tokens=True)
return caption
def detect_objects(self, image: Image.Image) -> list:
"""Detect objects in image using CLIP"""
object_queries = [
"a person", "a car", "a building", "a tree", "an animal",
"furniture", "food", "technology", "nature", "art"
]
inputs = self.clip_processor(
text=object_queries,
images=image,
return_tensors="pt",
padding=True
).to(self.device)
with torch.no_grad():
outputs = self.clip_model(**inputs)
probs = outputs.logits_per_image.softmax(dim=1)
detected_objects = []
for i, query in enumerate(object_queries):
confidence = probs[0][i].item()
if confidence > 0.1: # Threshold for detection
detected_objects.append({
'object': query,
'confidence': confidence
})
return sorted(detected_objects, key=lambda x: x['confidence'], reverse=True)
def answer_about_image(self, image: Image.Image, question: str) -> str:
"""Answer questions about image content"""
inputs = self.blip_processor(
image,
question,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
output = self.blip_model.generate(
**inputs,
max_length=100,
num_beams=5,
early_stopping=True
)
answer = self.blip_processor.decode(output[0], skip_special_tokens=True)
return answer
def calculate_text_image_similarity(self, text: str, image: Image.Image) -> float:
"""Calculate similarity between text and image"""
inputs = self.clip_processor(
text=[text],
images=image,
return_tensors="pt",
padding=True
).to(self.device)
with torch.no_grad():
outputs = self.clip_model(**inputs)
similarity = torch.cosine_similarity(
outputs.text_embeds,
outputs.image_embeds
).item()
return similarity
def describe_image(self, image: Image.Image) -> str:
"""Generate detailed description"""
caption = self.generate_caption(image)
objects = self.detect_objects(image)
description = f"Image Description: {caption}\n\n"
description += "Detected elements:\n"
for obj in objects[:5]: # Top 5 objects
description += f"- {obj['object']}: {obj['confidence']:.2f} confidence\n"
return description
# Advanced Multimodal RAG System
class MultimodalRAGSystem:
def __init__(self):
self.multimodal_agent = MultimodalAgent()
self.image_database = {} # Store image embeddings
self.text_database = {} # Store text embeddings
def add_multimodal_document(self, text: str, image_path: str = None):
"""Add document with both text and image content"""
doc_id = len(self.text_database)
# Store text embedding
text_inputs = self.multimodal_agent.clip_processor(
text=[text], return_tensors="pt"
)
with torch.no_grad():
text_embedding = self.multimodal_agent.clip_model.get_text_features(**text_inputs)
self.text_database[doc_id] = {
'content': text,
'embedding': text_embedding.numpy(),
'image_path': image_path
}
# Store image embedding if provided
if image_path:
image = Image.open(image_path).convert('RGB')
image_inputs = self.multimodal_agent.clip_processor(
images=image, return_tensors="pt"
)
with torch.no_grad():
image_embedding = self.multimodal_agent.clip_model.get_image_features(**image_inputs)
self.image_database[doc_id] = {
'image_path': image_path,
'embedding': image_embedding.numpy()
}
def multimodal_search(self, query: str, query_image: str = None, k: int = 5):
"""Search using both text and image queries"""
results = []
if query:
# Text-based search
query_inputs = self.multimodal_agent.clip_processor(
text=[query], return_tensors="pt"
)
with torch.no_grad():
query_embedding = self.multimodal_agent.clip_model.get_text_features(**query_inputs)
for doc_id, doc in self.text_database.items():
similarity = torch.cosine_similarity(
query_embedding,
torch.tensor(doc['embedding'])
).item()
results.append({
'doc_id': doc_id,
'content': doc['content'],
'similarity': similarity,
'type': 'text'
})
if query_image:
# Image-based search
image = Image.open(query_image).convert('RGB')
image_inputs = self.multimodal_agent.clip_processor(
images=image, return_tensors="pt"
)
with torch.no_grad():
query_embedding = self.multimodal_agent.clip_model.get_image_features(**image_inputs)
for doc_id, doc in self.image_database.items():
similarity = torch.cosine_similarity(
query_embedding,
torch.tensor(doc['embedding'])
).item()
results.append({
'doc_id': doc_id,
'image_path': doc['image_path'],
'similarity': similarity,
'type': 'image'
})
# Sort by similarity and return top k
results.sort(key=lambda x: x['similarity'], reverse=True)
return results[:k]-
Attention Is All You Need (2017)
- Introduced Transformer architecture
- Foundation for modern LLMs
-
BERT: Pre-training of Deep Bidirectional Transformers (2018)
- Bidirectional encoder representations
- Changed NLP landscape
-
GPT: Improving Language Understanding by Generative Pre-Training (2018)
- Generative pre-training approach
- Started the GPT series
-
ReAct: Synergizing Reasoning and Acting in Language Models (2022)
- Reasoning + Acting paradigm
- Fundamental for tool-using agents
-
Toolformer: Language Models Can Teach Themselves to Use Tools (2023)
- Self-supervised tool learning
- API calling capabilities
-
AutoGPT: An Autonomous GPT-4 Experiment (2023)
- Autonomous task execution
- Chain of thought + actions
-
Constitutional AI: Harmlessness from AI Feedback (2022)
- AI safety and alignment
- Self-supervised harmlessness training
-
Chain-of-Thought Prompting Elicits Reasoning in Large Language Models (2022)
- Explicit reasoning steps
- Improved problem-solving
-
Tree of Thoughts: Deliberate Problem Solving with Large Language Models (2023)
- Advanced reasoning framework
- Tree search for problem solving
-
HuggingGPT: Solving AI Tasks with ChatGPT and its Friends in Hugging Face (2023)
- Multi-model collaboration
- Task planning and execution
- Chatbot with Memory - Build a conversational agent that remembers context
- Document Q&A System - RAG system for PDF documents
- Code Explainer - Agent that explains code in natural language
- Task Scheduler Bot - Personal productivity assistant
- News Summarizer - Multi-source news aggregation and summarization
- Multi-Agent Debate System - Agents with different viewpoints discuss topics
- Code Review Assistant - Automated code quality analysis
- Research Paper Analyzer - Extract and synthesize research findings
- Smart Home Controller - IoT device management with natural language
- Financial Analysis Agent - Market research and investment recommendations
- Content Creation Pipeline - Blog posts, social media, video scripts
- Customer Service Bot - Handle complex customer inquiries
- Learning Tutor Agent - Personalized education and curriculum
- Autonomous Web Agent - Navigate and interact with websites
- Software Development Team - Multiple agents for full SDLC
- Scientific Research Assistant - Hypothesis generation and testing
- Creative Writing Collaborator - Co-author stories and screenplays
- Business Process Optimizer - Analyze and improve workflows
- Multi-Modal Content Analyzer - Process text, images, audio, video
- Distributed Agent Network - Coordinated agents across multiple servers
- API key management and rotation
- Input validation and sanitization
- Rate limiting and abuse protection
- Audit logging and monitoring
- Data encryption (at rest and in transit)
- Performance metrics (latency, throughput)
- Error tracking and alerting
- Cost monitoring (API usage, compute)
- User feedback collection
- A/B testing infrastructure
- Containerization (Docker/Kubernetes)
- Load balancing and auto-scaling
- Database optimization
- Caching strategies (Redis/Memcached)
- Backup and disaster recovery
- Unit tests for all components
- Integration testing
- Load testing
- Security testing
- User acceptance testing
# Production-Ready Agent Server
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import asyncio
import redis
import logging
from typing import Dict, Any
import uuid
from datetime import datetime
app = FastAPI(title="Production Agent API", version="1.0.0")
redis_client = redis.Redis(host='redis-server', port=6379, db=0)
# Request/Response Models
class AgentRequest(BaseModel):
query: str
user_id: str
session_id: str
context: Dict[str, Any] = {}
class AgentResponse(BaseModel):
response: str
confidence: float
sources: list
processing_time: float
request_id: str
# Production Agent with Monitoring
class ProductionAgent:
def __init__(self):
self.request_count = 0
self.error_count = 0
self.setup_logging()
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
async def process_request(self, request: AgentRequest) -> AgentResponse:
request_id = str(uuid.uuid4())
start_time = datetime.now()
try:
self.request_count += 1
self.logger.info(f"Processing request {request_id} for user {request.user_id}")
# Check rate limiting
if not self.check_rate_limit(request.user_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Process with agent (simplified)
response = await self.generate_response(request)
processing_time = (datetime.now() - start_time).total_seconds()
# Log metrics
self.logger.info(f"Request {request_id} completed in {processing_time:.2f}s")
return AgentResponse(
response=response,
confidence=0.85, # Would be calculated
sources=[],
processing_time=processing_time,
request_id=request_id
)
except Exception as e:
self.error_count += 1
self.logger.error(f"Error processing request {request_id}: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
def check_rate_limit(self, user_id: str, limit: int = 100, window: int = 3600) -> bool:
"""Check if user is within rate limits"""
key = f"rate_limit:{user_id}"
current = redis_client.get(key)
if current is None:
redis_client.setex(key, window, 1)
return True
if int(current) < limit:
redis_client.incr(key)
return True
return False
async def generate_response(self, request: AgentRequest) -> str:
"""Generate response (implement your agent logic here)"""
await asyncio.sleep(0.5) # Simulate processing
return f"Response to: {request.query}"
# Initialize agent
agent = ProductionAgent()
@app.post("/chat", response_model=AgentResponse)
async def chat_endpoint(request: AgentRequest):
"""Main chat endpoint"""
return await agent.process_request(request)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"requests_processed": agent.request_count,
"error_count": agent.error_count,
"timestamp": datetime.now().isoformat()
}
@app.get("/metrics")
async def metrics():
"""Metrics endpoint for monitoring"""
return {
"requests_total": agent.request_count,
"errors_total": agent.error_count,
"error_rate": agent.error_count / max(agent.request_count, 1),
"uptime": "calculated_uptime"
}🔗 Complete LangChain Tutorial
pip install langchain langchain-openai langchain-community langchainhub
pip install langgraph # For graph-based workflowsfrom langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
from langchain import hub
# Define custom tools
def calculator(expression: str) -> str:
"""Evaluate mathematical expressions"""
try:
return str(eval(expression))
except Exception as e:
return f"Error: {str(e)}"
def web_search(query: str) -> str:
"""Search the web for information"""
# Implementation would use actual search API
return f"Search results for: {query}"
# Create tools
tools = [
Tool(
name="Calculator",
func=calculator,
description="Useful for mathematical calculations"
),
Tool(
name="WebSearch",
func=web_search,
description="Search the internet for current information"
)
]
# Initialize LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Get prompt template
prompt = hub.pull("hwchase17/openai-functions-agent")
# Create agent
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# Use the agent
result = agent_executor.invoke({
"input": "What's 15 * 23 and then search for information about that number"
})from langgraph.graph import StateGraph, END
from typing import TypedDict
class AgentState(TypedDict):
input: str
chat_history: list
intermediate_steps: list
output: str
def research_node(state: AgentState):
"""Research information about the topic"""
# Implementation for research
return {"intermediate_steps": state["intermediate_steps"] + ["research_done"]}
def analyze_node(state: AgentState):
"""Analyze the researched information"""
# Implementation for analysis
return {"intermediate_steps": state["intermediate_steps"] + ["analysis_done"]}
def synthesize_node(state: AgentState):
"""Synthesize final response"""
# Implementation for synthesis
return {"output": "Final synthesized response"}
# Build the graph
workflow = StateGraph(AgentState)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("synthesize", synthesize_node)
workflow.set_entry_point("research")
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "synthesize")
workflow.add_edge("synthesize", END)
app = workflow.compile()- Custom Retrievers:
from langchain.schema import BaseRetriever, Document
class CustomRetriever(BaseRetriever):
def get_relevant_documents(self, query: str) -> list[Document]:
# Custom retrieval logic
return [Document(page_content="relevant content", metadata={})]- Memory Management:
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(
k=5, # Keep last 5 exchanges
return_messages=True
)🤖 Multi-Agent Conversations with AutoGen
pip install pyautogenimport autogen
# Configuration for different agents
config_list = [
{
"model": "gpt-4",
"api_key": "your-openai-api-key"
}
]
# Create different types of agents
user_proxy = autogen.UserProxyAgent(
name="user_proxy",
system_message="A human admin who gives tasks and provides feedback.",
code_execution_config={"last_n_messages": 2, "work_dir": "groupchat"},
human_input_mode="TERMINATE"
)
coder = autogen.AssistantAgent(
name="coder",
system_message="You are a skilled Python programmer. Write clean, efficient code.",
llm_config={"config_list": config_list}
)
reviewer = autogen.AssistantAgent(
name="code_reviewer",
system_message="You review code for bugs, efficiency, and best practices.",
llm_config={"config_list": config_list}
)
# Group chat for multiple agents
groupchat = autogen.GroupChat(
agents=[user_proxy, coder, reviewer],
messages=[],
max_round=10
)
manager = autogen.GroupChatManager(
groupchat=groupchat,
llm_config={"config_list": config_list}
)
# Start the conversation
user_proxy.initiate_chat(
manager,
message="Create a Python function to analyze stock market data and have it reviewed."
)# Custom Agent with Specific Tools
class DataAnalysisAgent(autogen.AssistantAgent):
def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)
self.tools = ["pandas", "numpy", "matplotlib", "seaborn"]
def analyze_data(self, data_path):
# Custom data analysis logic
pass
# Agent with Web Browsing Capability
browsing_agent = autogen.AssistantAgent(
name="web_researcher",
system_message="You can browse the web to gather information.",
llm_config={
"config_list": config_list,
"functions": [
{
"name": "web_search",
"description": "Search the web for information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
}
}
}
]
}
)👷 Professional Agent Teams with CrewAI
pip install crewai crewai-toolsfrom crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, ScrapeWebsiteTool
# Initialize tools
search_tool = SerperDevTool()
scrape_tool = ScrapeWebsiteTool()
# Define agents with specific roles
researcher = Agent(
role='Senior Research Analyst',
goal='Uncover cutting-edge developments in AI and machine learning',
backstory="""You are a Senior Research Analyst at a leading tech think tank.
Your expertise lies in identifying emerging trends and technologies in AI.""",
verbose=True,
allow_delegation=False,
tools=[search_tool, scrape_tool]
)
writer = Agent(
role='Tech Content Strategist',
goal='Craft compelling content on tech advancements',
backstory="""You are a renowned Tech Content Strategist, known for your insightful
and engaging articles on technology and innovation.""",
verbose=True,
allow_delegation=True
)
editor = Agent(
role='Senior Editor',
goal='Ensure content quality and coherence',
backstory="""You are a Senior Editor with decades of experience in publishing.
Your role is to refine and perfect written content.""",
verbose=True,
allow_delegation=False
)
# Define tasks
research_task = Task(
description="""Conduct comprehensive research on the latest AI developments.
Focus on breakthrough technologies, new frameworks, and industry applications.""",
agent=researcher,
expected_output="A detailed research report with key findings and sources."
)
writing_task = Task(
description="""Using the research report, create an engaging article about AI developments.
The article should be informative yet accessible to a general tech audience.""",
agent=writer,
expected_output="A well-structured article ready for publication."
)
editing_task = Task(
description="""Review the article for clarity, coherence, and engagement.
Ensure proper structure and flow.""",
agent=editor,
expected_output="A polished, publication-ready article."
)
# Assemble the crew
crew = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, writing_task, editing_task],
verbose=2,
process=Process.sequential
)
# Execute the workflow
result = crew.kickoff()# Hierarchical Process
crew_hierarchical = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, writing_task, editing_task],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4")
)
# Custom Tools for Agents
from crewai_tools import BaseTool
class CustomAnalysisTool(BaseTool):
name: str = "Data Analysis Tool"
description: str = "Analyzes data and provides insights"
def _run(self, data: str) -> str:
# Custom analysis logic
return "Analysis results"
# Agent with custom tool
analyst = Agent(
role='Data Analyst',
goal='Provide data-driven insights',
backstory='Expert in statistical analysis and data interpretation',
tools=[CustomAnalysisTool()],
verbose=True
)🗄️ Advanced RAG Implementation
# Chroma (Lightweight, local)
pip install chromadb
# Pinecone (Managed, scalable)
pip install pinecone-client
# Weaviate (Open source, GraphQL)
pip install weaviate-client
# Qdrant (High performance)
pip install qdrant-clientimport chromadb
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
class AdvancedRAGSystem:
def __init__(self):
# Initialize components
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
self.vector_db = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.vector_db.get_or_create_collection("documents")
# Initialize generation model
self.tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")
self.generator = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")
def add_documents(self, documents):
"""
Add a list of documents (strings) to the ChromaDB collection.
"""
embeddings = self.embedding_model.encode(documents).tolist()
ids = [f"doc_{i}" for i in range(len(documents))]
self.collection.add(documents=documents, embeddings=embeddings, ids=ids)
def retrieve(self, query, top_k=3):
"""
Retrieve top_k relevant documents based on the query.
"""
query_embedding = self.embedding_model.encode([query])[0].tolist()
results = self.collection.query(query_embeddings=[query_embedding], n_results=top_k)
return results['documents'][0] if results and results['documents'] else []
def generate_response(self, query):
"""
Generate a response based on retrieved documents and query.
"""
# Retrieve relevant context
context_docs = self.retrieve(query)
context = " ".join(context_docs)
# Combine context with query
input_text = f"Context: {context}\nUser: {query}\nBot:"
inputs = self.tokenizer.encode(input_text, return_tensors="pt", max_length=1024, truncation=True)
# Generate a response
output = self.generator.generate(
inputs,
max_length=1024,
pad_token_id=self.tokenizer.eos_token_id,
do_sample=True,
top_k=50,
top_p=0.95,
temperature=0.8,
num_return_sequences=1
)
response = self.tokenizer.decode(output[0], skip_special_tokens=True)
# Extract only the bot's response
return response.split("Bot:")[-1].strip()
# Example usage:
# rag = AdvancedRAGSystem()
# rag.add_documents(["The Eiffel Tower is located in Paris.", "The capital of France is Paris."])
# answer = rag.generate_response("Where is the Eiffel Tower?")
# print(answer)We welcome contributions! Here's how you can help:
- Bug reports with reproducible examples
- Documentation errors or unclear explanations
- Missing or outdated information
- New tutorials or project ideas
- Better explanations or examples
- Tool integrations and frameworks
- Write tutorials and guides
- Add code examples and projects
- Review and update existing content
- Translate content to other languages
- Fix bugs in example code
- Add new features to project templates
- Improve performance and efficiency
- Add tests and documentation
- Fork the repository and create a feature branch
- Follow the existing style and structure
- Test your changes thoroughly
- Update documentation as needed
- Submit a pull request with clear description
Contributors will be recognized in:
- README contributor section
- Individual tutorial/project credits
- Community showcase features
- Annual contributor awards
This project is licensed under the MIT License - see the LICENSE file for details.
If you use this resource in your research or projects, please cite:
@misc{Awesome Agentic AI Learning Resource By DevKay,
title={Awesome Agentic AI Learning Resource By DevKay},
author={Dharmin Joshi / DevKay},
year={2025},
url={https://github.com/DharminJoshi/Awesome-Agentic-AI-Learning-Resource-By-DevKay}
}This resource is designed to be your complete companion in mastering AI and building intelligent agents. Whether you're just starting or looking to build production systems, you'll find actionable content here.
- Start with fundamentals - Build strong foundations
- Practice consistently - Work on projects regularly
- Join communities - Learn from others and share knowledge
- Stay updated - AI moves fast, keep learning
- Build and ship - Create real applications people use
- ⭐ Star this repository to stay updated
- 🐛 Report issues to help improve content
- 💬 Join discussions in GitHub Discussions
- 🔔 Watch for updates - new content added regularly
"The best time to start learning AI was yesterday. The second best time is now."
Ready to build the future with intelligent agents? Start your journey today! 🚀
🚀 Powered and maintained by Dharmin Joshi / DevKay
💬 Join the community on Discord: Invite Link