Python Programming: Advanced Concepts
1. Advanced Data Structures
Collections Module
Python's collections module provides specialized container datatypes that go beyond the built-in types:
python
1from collections import Counter, defaultdict, namedtuple, deque
23# Counter - count occurrences of elements
4words = ['apple', 'orange', 'apple', 'banana', 'apple', 'orange']
5word_counts = Counter(words)
6print(word_counts) # Counter({'apple': 3, 'orange': 2, 'banana': 1})
7print(word_counts.most_common(1)) # [('apple', 3)]
89# defaultdict - dictionary with default values
10fruit_colors = defaultdict(list)
11fruit_colors['apple'].append('red') # No KeyError if key doesn't exist
12fruit_colors['apple'].append('green')
13fruit_colors['banana'].append('yellow')
14print(dict(fruit_colors)) # {'apple': ['red', 'green'], 'banana': ['yellow']}
1516# namedtuple - tuple with named fields
17Person = namedtuple('Person', ['name', 'age', 'city'])
18alice = Person('Alice', 30, 'New York')
19print(alice.name, alice.age, alice.city) # Alice 30 New York
20print(alice[0], alice[1], alice[2]) # Alice 30 New York
2122# deque - double-ended queue with fast appends and pops
23queue = deque(['a', 'b', 'c'])
24queue.append('d') # Add to right
25queue.appendleft('z') # Add to left
26print(queue) # deque(['z', 'a', 'b', 'c', 'd'])
27print(queue.pop()) # 'd'
28print(queue.popleft()) # 'z'
29print(queue) # deque(['a', 'b', 'c'])
Comprehensions
python
1# List comprehension
2numbers = [1, 2, 3, 4, 5]
3squares = [x**2 for x in numbers]
4print(squares) # [1, 4, 9, 16, 25]
56# List comprehension with condition
7even_squares = [x**2 for x in numbers if x % 2 == 0]
8print(even_squares) # [4, 16]
910# Dictionary comprehension
11square_dict = {x: x**2 for x in numbers}
12print(square_dict) # {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
1314# Set comprehension
15square_set = {x**2 for x in numbers}
16print(square_set) # {1, 4, 9, 16, 25}
1718# Generator expression (memory efficient)
19sum_of_squares = sum(x**2 for x in numbers)
20print(sum_of_squares) # 55
Advanced Dictionary Operations
python
1# Merging dictionaries (Python 3.9+)
2dict1 = {'a': 1, 'b': 2}
3dict2 = {'b': 3, 'c': 4}
4merged = dict1 | dict2 # {'a': 1, 'b': 3, 'c': 4}
5print(merged)
67# Dictionary unpacking
8user = {'name': 'Alice', 'age': 30}
9defaults = {'country': 'USA', 'language': 'English'}
10full_profile = {**defaults, **user}
11print(full_profile) # {'country': 'USA', 'language': 'English', 'name': 'Alice', 'age': 30}
1213# Dictionary views
14student = {'name': 'Bob', 'grades': [85, 90, 78]}
15keys = student.keys()
16values = student.values()
17items = student.items()
1819print(keys) # dict_keys(['name', 'grades'])
20print(values) # dict_values(['Bob', [85, 90, 78]])
21print(items) # dict_items([('name', 'Bob'), ('grades', [85, 90, 78])])
2223# Views are dynamic
24student['age'] = 20
25print(keys) # dict_keys(['name', 'grades', 'age'])
2. File Handling
Reading and Writing Text Files
python
1# Writing to a text file
2with open('example.txt', 'w') as file:
3file.write('Hello, Python!
4')
5file.write('File handling is essential.
6')
7
8# Reading from a text file
9with open('example.txt', 'r') as file:
10content = file.read()
11print(content)
12
13# Reading line by line
14with open('example.txt', 'r') as file:
15for line in file:
16print(line.strip()) # strip() removes the newline character
17
18# Appending to a file
19with open('example.txt', 'a') as file:
20file.write('This line is appended.
21')
Working with CSV Files
python
1import csv
23# Writing CSV data
4data = [
5['Name', 'Age', 'Country'],
6['Alice', 30, 'USA'],
7['Bob', 25, 'Canada'],
8['Charlie', 35, 'UK']
9]
1011with open('people.csv', 'w', newline='') as file:
12writer = csv.writer(file)
13writer.writerows(data)
14
15# Reading CSV data
16with open('people.csv', 'r', newline='') as file:
17reader = csv.reader(file)
18for row in reader:
19print(row)
20
21# Using DictReader and DictWriter
22with open('people.csv', 'r', newline='') as file:
23reader = csv.DictReader(file)
24for row in reader:
25print(f"{row['Name']} is {row['Age']} years old from {row['Country']}")
Working with JSON
python
1import json
23# Python dictionary
4user = {
5'name': 'Alice',
6'age': 30,
7'is_active': True,
8'skills': ['Python', 'JavaScript', 'SQL'],
9'address': {
10'city': 'New York',
11'country': 'USA'
12}
13}
1415# Convert Python object to JSON string
16json_string = json.dumps(user, indent=4)
17print(json_string)
1819# Write JSON to file
20with open('user.json', 'w') as file:
21json.dump(user, file, indent=4)
22
23# Read JSON from file
24with open('user.json', 'r') as file:
25loaded_user = json.load(file)
26print(loaded_user)
27
28# Parse JSON string
29json_data = '{"name": "Bob", "age": 25}'
30parsed_data = json.loads(json_data)
31print(parsed_data['name']) # Bob
Working with Binary Files
python
1# Writing binary data
2with open('binary_file.bin', 'wb') as file:
3file.write(b'\x48\x65\x6c\x6c\x6f') # "Hello" in hex
4
5# Reading binary data
6with open('binary_file.bin', 'rb') as file:
7data = file.read()
8print(data) # b'Hello'
9print(data.decode('utf-8')) # Convert bytes to string: "Hello"
3. Exception Handling
Basic Exception Handling
python
1# Basic try-except
2try:
3x = 10 / 0 # This will raise a ZeroDivisionError
4except ZeroDivisionError:
5print("Cannot divide by zero!")
6
7# Handling multiple exceptions
8try:
9num = int(input("Enter a number: "))
10result = 10 / num
11print(f"Result: {result}")
12except ValueError:
13print("That's not a valid number!")
14except ZeroDivisionError:
15print("Cannot divide by zero!")
16
17# Using else clause (runs if no exception occurs)
18try:
19num = int(input("Enter a number: "))
20result = 10 / num
21except (ValueError, ZeroDivisionError) as e:
22print(f"Error: {e}")
23else:
24print(f"Result: {result}")
25
26# Using finally clause (always runs)
27try:
28file = open("example.txt", "r")
29content = file.read()
30except FileNotFoundError:
31print("File not found!")
32finally:
33# This will run whether an exception occurred or not
34print("Execution completed")
35# Close the file if it was opened
36if 'file' in locals() and not file.closed:
37file.close()
Creating Custom Exceptions
python
1# Define custom exception
2class InsufficientFundsError(Exception):
3"""Raised when a withdrawal exceeds the available balance"""
4def __init__(self, balance, amount):
5self.balance = balance
6self.amount = amount
7self.message = f"Cannot withdraw $ {amount}. Only $ {balance} available."
8super().__init__(self.message)
910# Using custom exception
11class BankAccount:
12def __init__(self, balance=0):
13self.balance = balance
14
15def deposit(self, amount):
16self.balance += amount
17return self.balance
18
19def withdraw(self, amount):
20if amount > self.balance:
21raise InsufficientFundsError(self.balance, amount)
22self.balance -= amount
23return self.balance
2425# Test the custom exception
26account = BankAccount(100)
27try:
28account.withdraw(150)
29except InsufficientFundsError as e:
30print(e) # Cannot withdraw $150. Only $100 available.
Context Managers with try-finally
python
1# Context manager using class
2class FileManager:
3def __init__(self, filename, mode):
4self.filename = filename
5self.mode = mode
6self.file = None
7
8def __enter__(self):
9self.file = open(self.filename, self.mode)
10return self.file
11
12def __exit__(self, exc_type, exc_val, exc_tb):
13if self.file:
14self.file.close()
15# Return True to suppress exception, False to propagate it
16return False
1718# Using the context manager
19with FileManager('example.txt', 'w') as file:
20file.write('Using a custom context manager')
21
22# Context manager using decorator
23from contextlib import contextmanager
2425@contextmanager
26def open_file(filename, mode):
27try:
28file = open(filename, mode)
29yield file
30finally:
31file.close()
3233# Using the decorator-based context manager
34with open_file('example.txt', 'r') as file:
35content = file.read()
36print(content)
4. Decorators and Generators
Decorators
Decorators are functions that modify the behavior of other functions or methods:
python
1# Basic decorator
2def my_decorator(func):
3def wrapper():
4print("Something is happening before the function is called.")
5func()
6print("Something is happening after the function is called.")
7return wrapper
89@my_decorator
10def say_hello():
11print("Hello!")
1213say_hello()
14# Output:
15# Something is happening before the function is called.
16# Hello!
17# Something is happening after the function is called.
1819# Decorator with arguments
20def repeat(n):
21def decorator(func):
22def wrapper(*args, **kwargs):
23for _ in range(n):
24result = func(*args, **kwargs)
25return result
26return wrapper
27return decorator
2829@repeat(3)
30def greet(name):
31print(f"Hello, {name}!")
32
33greet("Alice")
34# Output:
35# Hello, Alice!
36# Hello, Alice!
37# Hello, Alice!
3839# Practical example: timing decorator
40import time
4142def timing_decorator(func):
43def wrapper(*args, **kwargs):
44start_time = time.time()
45result = func(*args, **kwargs)
46end_time = time.time()
47print(f"{func.__name__} took {end_time - start_time:.4f} seconds to run")
48return result
49return wrapper
5051@timing_decorator
52def slow_function():
53time.sleep(1)
54return "Function completed"
5556print(slow_function())
57# Output:
58# slow_function took 1.0012 seconds to run
59# Function completed
Generators
Generators are functions that can pause and resume their execution, yielding values one at a time:
python
1# Basic generator
2def count_up_to(n):
3i = 1
4while i <= n:
5yield i
6i += 1
78# Using the generator
9counter = count_up_to(5)
10print(next(counter)) # 1
11print(next(counter)) # 2
12print(next(counter)) # 3
1314# Iterating over a generator
15for num in count_up_to(5):
16print(num) # Prints 1, 2, 3, 4, 5
1718# Generator expressions
19squares = (x**2 for x in range(1, 6))
20print(list(squares)) # [1, 4, 9, 16, 25]
2122# Memory efficiency example
23import sys
2425# List comprehension (stores all values in memory)
26list_comp = [x**2 for x in range(10000)]
27# Generator expression (generates values on-the-fly)
28gen_exp = (x**2 for x in range(10000))
2930print(f"List size: {sys.getsizeof(list_comp)} bytes")
31print(f"Generator size: {sys.getsizeof(gen_exp)} bytes")
3233# Infinite sequence generator
34def fibonacci():
35a, b = 0, 1
36while True:
37yield a
38a, b = b, a + b
3940# Get first 10 Fibonacci numbers
41fib = fibonacci()
42for _ in range(10):
43print(next(fib), end=" ") # 0 1 1 2 3 5 8 13 21 34
Combining Decorators and Generators
python
1# Decorator for a generator
2def debug_generator(func):
3def wrapper(*args, **kwargs):
4gen = func(*args, **kwargs)
5for value in gen:
6print(f"Generator yielded: {value}")
7yield value
8return wrapper
910@debug_generator
11def numbers(n):
12for i in range(n):
13yield i * i
1415# Using the decorated generator
16for num in numbers(5):
17pass # The decorator will print each yielded value
5. Working with APIs
Python makes it easy to interact with web APIs using the requests library:
python
1import requests
23# Making a GET request
4response = requests.get('https://jsonplaceholder.typicode.com/posts/1')
5print(f"Status code: {response.status_code}")
6print(f"Content type: {response.headers['Content-Type']}")
78# Parsing JSON response
9data = response.json()
10print(f"Post title: {data['title']}")
1112# Making a GET request with parameters
13params = {'userId': 1}
14response = requests.get('https://jsonplaceholder.typicode.com/posts', params=params)
15posts = response.json()
16print(f"Number of posts by user 1: {len(posts)}")
1718# Making a POST request
19new_post = {
20'title': 'Python API Tutorial',
21'body': 'This is a post about using APIs with Python',
22'userId': 1
23}
24response = requests.post('https://jsonplaceholder.typicode.com/posts', json=new_post)
25created_post = response.json()
26print(f"Created post ID: {created_post['id']}")
2728# Making a PUT request (update)
29updated_post = {
30'id': 1,
31'title': 'Updated Title',
32'body': 'This post has been updated',
33'userId': 1
34}
35response = requests.put('https://jsonplaceholder.typicode.com/posts/1', json=updated_post)
36print(f"Update status: {response.status_code}")
3738# Making a DELETE request
39response = requests.delete('https://jsonplaceholder.typicode.com/posts/1')
40print(f"Delete status: {response.status_code}")
4142# Handling authentication
43response = requests.get(
44'https://api.github.com/user',
45auth=('username', 'personal_access_token')
46)
4748# Using sessions for multiple requests
49session = requests.Session()
50session.headers.update({'User-Agent': 'Python API Tutorial'})
51response = session.get('https://api.github.com/repos/python/cpython')
52print(response.json()['description'])
Async API Requests
python
1import asyncio
2import aiohttp
3import time
45async def fetch_data(session, url):
6async with session.get(url) as response:
7return await response.json()
89async def main():
10start_time = time.time()
11
12# Create a session
13async with aiohttp.ClientSession() as session:
14# Create tasks for concurrent execution
15tasks = []
16for i in range(1, 11):
17url = f'https://jsonplaceholder.typicode.com/posts/{i}'
18tasks.append(fetch_data(session, url))
19
20# Wait for all tasks to complete
21results = await asyncio.gather(*tasks)
22
23# Process results
24for i, result in enumerate(results, 1):
25print(f"Post {i} title: {result['title'][:30]}...")
26
27end_time = time.time()
28print(f"Fetched 10 posts in {end_time - start_time:.2f} seconds")
2930# Run the async function
31asyncio.run(main())
6. Data Analysis with Pandas
Pandas is a powerful library for data manipulation and analysis:
python
1import pandas as pd
2import numpy as np
3import matplotlib.pyplot as plt
45# Creating a DataFrame
6data = {
7'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eva'],
8'Age': [25, 30, 35, 40, 45],
9'City': ['New York', 'Boston', 'Chicago', 'Boston', 'New York'],
10'Salary': [50000, 60000, 70000, 80000, 90000]
11}
1213df = pd.DataFrame(data)
14print(df)
1516# Basic DataFrame operations
17print(df.head(2)) # First 2 rows
18print(df.tail(2)) # Last 2 rows
19print(df.info()) # DataFrame info
20print(df.describe()) # Statistical summary
2122# Selecting data
23print(df['Name']) # Select a column
24print(df[['Name', 'Age']]) # Select multiple columns
25print(df.loc[0]) # Select a row by label
26print(df.iloc[0]) # Select a row by position
27print(df.loc[0:2, 'Name':'City']) # Select rows and columns by label
28print(df.iloc[0:2, 0:2]) # Select rows and columns by position
2930# Filtering data
31print(df[df['Age'] > 30]) # Filter by condition
32print(df[(df['Age'] > 30) & (df['Salary'] > 70000)]) # Multiple conditions
3334# Adding and modifying columns
35df['Experience'] = [3, 5, 8, 12, 15] # Add a new column
36df['Salary'] = df['Salary'] * 1.1 # Modify a column
37print(df)
3839# Grouping and aggregation
40city_groups = df.groupby('City')
41print(city_groups.mean()) # Mean of numeric columns by city
42print(city_groups.agg({
43'Age': 'mean',
44'Salary': ['min', 'max', 'mean'],
45'Experience': 'sum'
46}))
4748# Handling missing values
49df_missing = df.copy()
50df_missing.loc[0, 'Age'] = np.nan
51df_missing.loc[2, 'Salary'] = np.nan
5253print(df_missing.isna().sum()) # Count missing values
54print(df_missing.fillna(0)) # Fill missing values with 0
55print(df_missing.dropna()) # Drop rows with missing values
5657# Merging DataFrames
58df1 = pd.DataFrame({
59'ID': [1, 2, 3, 4],
60'Name': ['Alice', 'Bob', 'Charlie', 'David'],
61'Department': ['HR', 'IT', 'Finance', 'IT']
62})
6364df2 = pd.DataFrame({
65'ID': [1, 2, 3, 5],
66'Salary': [50000, 60000, 70000, 90000],
67'Experience': [3, 5, 8, 15]
68})
6970# Inner join
71merged_inner = pd.merge(df1, df2, on='ID', how='inner')
72print(merged_inner)
7374# Outer join
75merged_outer = pd.merge(df1, df2, on='ID', how='outer')
76print(merged_outer)
7778# Basic visualization
79df.plot(kind='bar', x='Name', y='Salary', figsize=(10, 6))
80plt.title('Salary by Employee')
81plt.ylabel('Salary ($)')
82plt.tight_layout()
83plt.show()
Time Series Analysis
python
1# Create a time series
2dates = pd.date_range('20230101', periods=12, freq='M')
3ts = pd.Series(np.random.randn(12) * 100 + 500, index=dates)
4print(ts)
56# Resampling
7print(ts.resample('Q').mean()) # Quarterly average
8print(ts.resample('Q').agg(['min', 'max', 'mean'])) # Multiple aggregations
910# Rolling statistics
11print(ts.rolling(window=3).mean()) # 3-month rolling average
1213# Shifting
14print(ts.shift(1)) # Shift values forward by 1
15print(ts.diff()) # Difference between current and previous value
1617# Time series visualization
18ts.plot(figsize=(10, 6))
19ts.rolling(window=3).mean().plot(style='r--')
20plt.title('Monthly Values with 3-Month Rolling Average')
21plt.ylabel('Value')
22plt.legend(['Original', '3-Month Avg'])
23plt.tight_layout()
24plt.show()
7. Web Development with Flask
Flask is a lightweight web framework for Python:
python
1from flask import Flask, render_template, request, redirect, url_for, jsonify
23# Create a Flask application
4app = Flask(__name__)
56# Sample data
7tasks = [
8{'id': 1, 'title': 'Learn Python', 'done': True},
9{'id': 2, 'title': 'Learn Flask', 'done': False},
10{'id': 3, 'title': 'Build a web app', 'done': False}
11]
1213# Route for the home page
14@app.route('/')
15def index():
16return render_template('index.html', tasks=tasks)
1718# Route to get all tasks (API)
19@app.route('/api/tasks', methods=['GET'])
20def get_tasks():
21return jsonify({'tasks': tasks})
2223# Route to get a specific task (API)
24@app.route('/api/tasks/<int:task_id>', methods=['GET'])
25def get_task(task_id):
26task = next((task for task in tasks if task['id'] == task_id), None)
27if task:
28return jsonify({'task': task})
29return jsonify({'error': 'Task not found'}), 404
3031# Route to create a new task (API)
32@app.route('/api/tasks', methods=['POST'])
33def create_task():
34if not request.json or 'title' not in request.json:
35return jsonify({'error': 'Title is required'}), 400
36
37task_id = max(task['id'] for task in tasks) + 1
38task = {
39'id': task_id,
40'title': request.json['title'],
41'done': False
42}
43tasks.append(task)
44return jsonify({'task': task}), 201
4546# Route to update a task (API)
47@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
48def update_task(task_id):
49task = next((task for task in tasks if task['id'] == task_id), None)
50if not task:
51return jsonify({'error': 'Task not found'}), 404
52
53if 'title' in request.json:
54task['title'] = request.json['title']
55if 'done' in request.json:
56task['done'] = request.json['done']
57
58return jsonify({'task': task})
5960# Route to delete a task (API)
61@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
62def delete_task(task_id):
63task = next((task for task in tasks if task['id'] == task_id), None)
64if not task:
65return jsonify({'error': 'Task not found'}), 404
66
67tasks.remove(task)
68return jsonify({'result': True})
6970# Form submission route
71@app.route('/add', methods=['POST'])
72def add_task():
73title = request.form.get('title')
74if title:
75task_id = max(task['id'] for task in tasks) + 1
76tasks.append({'id': task_id, 'title': title, 'done': False})
77return redirect(url_for('index'))
7879# Toggle task status route
80@app.route('/toggle/<int:task_id>')
81def toggle_task(task_id):
82task = next((task for task in tasks if task['id'] == task_id), None)
83if task:
84task['done'] = not task['done']
85return redirect(url_for('index'))
8687# Delete task route
88@app.route('/delete/<int:task_id>')
89def remove_task(task_id):
90task = next((task for task in tasks if task['id'] == task_id), None)
91if task:
92tasks.remove(task)
93return redirect(url_for('index'))
9495if __name__ == '__main__':
96app.run(debug=True)
HTML Template Example
Here's a simple HTML template for the Flask app (save as templates/index.html):
html
1<!DOCTYPE html>
2<html lang="en">
3<head>
4<meta charset="UTF-8">
5<meta name="viewport" content="width=device-width, initial-scale=1.0">
6<title>Flask Todo App</title>
7<style>
8body {
9font-family: Arial, sans-serif;
10max-width: 800px;
11margin: 0 auto;
12padding: 20px;
13}
14.task {
15display: flex;
16align-items: center;
17padding: 10px;
18border-bottom: 1px solid #eee;
19}
20.task.done {
21text-decoration: line-through;
22color: #888;
23}
24.task-actions {
25margin-left: auto;
26}
27form {
28display: flex;
29margin-bottom: 20px;
30}
31input[type="text"] {
32flex-grow: 1;
33padding: 10px;
34border: 1px solid #ddd;
35border-radius: 4px 0 0 4px;
36}
37button {
38padding: 10px 15px;
39background-color: #4CAF50;
40color: white;
41border: none;
42border-radius: 0 4px 4px 0;
43cursor: pointer;
44}
45.task-actions a {
46margin-left: 10px;
47color: #666;
48text-decoration: none;
49}
50</style>
51</head>
52<body>
53<h1>Todo List</h1>
54
55<form action="/add" method="post">
56<input type="text" name="title" placeholder="Add a new task..." required>
57<button type="submit">Add</button>
58</form>
59
60<div class="tasks">
61{% for task in tasks %}
62<div class="task {% if task.done %}done{% endif %}">
63<span>{{ task.title }}</span>
64<div class="task-actions">
65<a href="/toggle/{{ task.id }}">{% if task.done %}Undo{% else %}Done{% endif %}</a>
66<a href="/delete/{{ task.id }}">Delete</a>
67</div>
68</div>
69{% endfor %}
70</div>
71</body>
72</html>
8. Testing in Python
Unit Testing with unittest
Python's built-in unittest framework allows you to write and run tests:
python
1import unittest
23# The code to test
4def add(a, b):
5return a + b
67def subtract(a, b):
8return a - b
910def multiply(a, b):
11return a * b
1213def divide(a, b):
14if b == 0:
15raise ValueError("Cannot divide by zero")
16return a / b
1718# Test case class
19class TestMathFunctions(unittest.TestCase):
20
21def test_add(self):
22self.assertEqual(add(10, 5), 15)
23self.assertEqual(add(-1, 1), 0)
24self.assertEqual(add(-1, -1), -2)
25
26def test_subtract(self):
27self.assertEqual(subtract(10, 5), 5)
28self.assertEqual(subtract(-1, 1), -2)
29self.assertEqual(subtract(-1, -1), 0)
30
31def test_multiply(self):
32self.assertEqual(multiply(10, 5), 50)
33self.assertEqual(multiply(-1, 1), -1)
34self.assertEqual(multiply(-1, -1), 1)
35
36def test_divide(self):
37self.assertEqual(divide(10, 5), 2)
38self.assertEqual(divide(-1, 1), -1)
39self.assertEqual(divide(5, 2), 2.5)
40
41# Test division by zero
42with self.assertRaises(ValueError):
43divide(10, 0)
44
45# Setup and teardown methods
46def setUp(self):
47# Code to run before each test
48print("Setting up test...")
49
50def tearDown(self):
51# Code to run after each test
52print("Tearing down test...")
53
54@classmethod
55def setUpClass(cls):
56# Code to run once before all tests
57print("Setting up test class...")
58
59@classmethod
60def tearDownClass(cls):
61# Code to run once after all tests
62print("Tearing down test class...")
6364if __name__ == '__main__':
65unittest.main()
Testing with pytest
pytest is a more modern testing framework with simpler syntax:
python
1# Save as test_math.py
2import pytest
34# The code to test
5def add(a, b):
6return a + b
78def divide(a, b):
9if b == 0:
10raise ValueError("Cannot divide by zero")
11return a / b
1213# Test functions
14def test_add():
15assert add(10, 5) == 15
16assert add(-1, 1) == 0
17assert add(-1, -1) == -2
1819def test_divide():
20assert divide(10, 5) == 2
21assert divide(-1, 1) == -1
22assert divide(5, 2) == 2.5
2324def test_divide_by_zero():
25with pytest.raises(ValueError):
26divide(10, 0)
2728# Parameterized tests
29@pytest.mark.parametrize("a, b, expected", [
30(10, 5, 15),
31(-1, 1, 0),
32(-1, -1, -2),
33(0, 0, 0)
34])
35def test_add_parameterized(a, b, expected):
36assert add(a, b) == expected
3738# Fixtures
39@pytest.fixture
40def sample_data():
41return [1, 2, 3, 4, 5]
4243def test_with_fixture(sample_data):
44assert len(sample_data) == 5
45assert sum(sample_data) == 15
Mock Objects
Using mock objects to isolate code for testing:
python
1import unittest
2from unittest.mock import Mock, patch
34# Function that makes an API call
5def get_user_data(user_id):
6# In a real app, this would call an API
7response = make_api_request(f"https://api.example.com/users/{user_id}")
8if response.status_code == 200:
9return response.json()
10return None
1112# Function that processes user data
13def process_user(user_id):
14user_data = get_user_data(user_id)
15if user_data and user_data.get('active'):
16return f"Active user: {user_data['name']}"
17return "Inactive or missing user"
1819# Test case using mocks
20class TestUserProcessing(unittest.TestCase):
21
22def test_process_active_user(self):
23# Create a mock for get_user_data
24mock_get_data = Mock(return_value={
25'id': 123,
26'name': 'Test User',
27'active': True
28})
29
30# Patch the get_user_data function
31with patch('__main__.get_user_data', mock_get_data):
32result = process_user(123)
33self.assertEqual(result, "Active user: Test User")
34mock_get_data.assert_called_once_with(123)
35
36def test_process_inactive_user(self):
37# Create a mock for get_user_data
38mock_get_data = Mock(return_value={
39'id': 456,
40'name': 'Inactive User',
41'active': False
42})
43
44# Patch the get_user_data function
45with patch('__main__.get_user_data', mock_get_data):
46result = process_user(456)
47self.assertEqual(result, "Inactive or missing user")
48
49def test_process_missing_user(self):
50# Create a mock for get_user_data
51mock_get_data = Mock(return_value=None)
52
53# Patch the get_user_data function
54with patch('__main__.get_user_data', mock_get_data):
55result = process_user(789)
56self.assertEqual(result, "Inactive or missing user")
9. Concurrency and Parallelism
Threading
python
1import threading
2import time
34def worker(name, delay):
5print(f"{name} started")
6time.sleep(delay)
7print(f"{name} finished")
89# Create threads
10thread1 = threading.Thread(target=worker, args=("Thread-1", 2))
11thread2 = threading.Thread(target=worker, args=("Thread-2", 4))
1213# Start threads
14start_time = time.time()
15thread1.start()
16thread2.start()
1718# Wait for threads to complete
19thread1.join()
20thread2.join()
2122end_time = time.time()
23print(f"All threads completed in {end_time - start_time:.2f} seconds")
Multiprocessing
python
1import multiprocessing
2import time
34def cpu_bound_task(number):
5"""A CPU-bound task that computes the sum of squares."""
6return sum(i * i for i in range(number))
78def process_worker(numbers):
9"""Process a list of numbers using a single process."""
10start_time = time.time()
11results = [cpu_bound_task(number) for number in numbers]
12end_time = time.time()
13print(f"Sequential processing took {end_time - start_time:.2f} seconds")
14return results
1516def process_worker_parallel(numbers):
17"""Process a list of numbers using multiple processes."""
18start_time = time.time()
19
20# Create a pool of processes
21with multiprocessing.Pool() as pool:
22# Map the function to the list of numbers
23results = pool.map(cpu_bound_task, numbers)
24
25end_time = time.time()
26print(f"Parallel processing took {end_time - start_time:.2f} seconds")
27return results
2829if __name__ == "__main__":
30# List of numbers to process
31numbers = [10000000, 20000000, 30000000, 40000000]
32
33# Process sequentially
34sequential_results = process_worker(numbers)
35
36# Process in parallel
37parallel_results = process_worker_parallel(numbers)
38
39# Verify results are the same
40print(f"Results match: {sequential_results == parallel_results}")
Asyncio
python
1import asyncio
2import time
34async def async_task(name, delay):
5print(f"{name} started at {time.strftime('%X')}")
6await asyncio.sleep(delay) # Non-blocking sleep
7print(f"{name} finished at {time.strftime('%X')}")
8return f"{name} completed"
910async def main():
11# Create tasks
12task1 = asyncio.create_task(async_task("Task-1", 2))
13task2 = asyncio.create_task(async_task("Task-2", 3))
14task3 = asyncio.create_task(async_task("Task-3", 1))
15
16print(f"Started at {time.strftime('%X')}")
17
18# Wait for all tasks to complete
19results = await asyncio.gather(task1, task2, task3)
20
21print(f"Finished at {time.strftime('%X')}")
22print(f"Results: {results}")
2324# Run the async program
25asyncio.run(main())
Combining Asyncio with Multiprocessing
python
1import asyncio
2import concurrent.futures
3import time
45def cpu_bound(number):
6"""CPU-bound task: find sum of squares."""
7return sum(i * i for i in range(number))
89async def cpu_bound_async(number):
10"""Run CPU-bound task in a thread pool."""
11loop = asyncio.get_running_loop()
12with concurrent.futures.ProcessPoolExecutor() as pool:
13return await loop.run_in_executor(pool, cpu_bound, number)
1415async def io_bound(delay):
16"""I/O-bound task: just wait."""
17print(f"IO task started, waiting for {delay} seconds")
18await asyncio.sleep(delay)
19print(f"IO task finished after {delay} seconds")
20return f"IO result after {delay}s"
2122async def main():
23start_time = time.time()
24
25# Combine CPU-bound and I/O-bound tasks
26cpu_tasks = [cpu_bound_async(num) for num in [10000000, 20000000, 30000000]]
27io_tasks = [io_bound(delay) for delay in [1, 2, 3]]
28
29# Run all tasks concurrently
30all_results = await asyncio.gather(*(cpu_tasks + io_tasks))
31
32end_time = time.time()
33print(f"All tasks completed in {end_time - start_time:.2f} seconds")
34print(f"Results: {all_results}")
3536if __name__ == "__main__":
37asyncio.run(main())