Python Programming: Advanced Concepts

1. Advanced Data Structures

Collections Module

Python's collections module provides specialized container datatypes that go beyond the built-in types:

python
1from collections import Counter, defaultdict, namedtuple, deque
2
3# Counter - count occurrences of elements
4words = ['apple', 'orange', 'apple', 'banana', 'apple', 'orange']
5word_counts = Counter(words)
6print(word_counts) # Counter({'apple': 3, 'orange': 2, 'banana': 1})
7print(word_counts.most_common(1)) # [('apple', 3)]
8
9# defaultdict - dictionary with default values
10fruit_colors = defaultdict(list)
11fruit_colors['apple'].append('red') # No KeyError if key doesn't exist
12fruit_colors['apple'].append('green')
13fruit_colors['banana'].append('yellow')
14print(dict(fruit_colors)) # {'apple': ['red', 'green'], 'banana': ['yellow']}
15
16# namedtuple - tuple with named fields
17Person = namedtuple('Person', ['name', 'age', 'city'])
18alice = Person('Alice', 30, 'New York')
19print(alice.name, alice.age, alice.city) # Alice 30 New York
20print(alice[0], alice[1], alice[2]) # Alice 30 New York
21
22# deque - double-ended queue with fast appends and pops
23queue = deque(['a', 'b', 'c'])
24queue.append('d') # Add to right
25queue.appendleft('z') # Add to left
26print(queue) # deque(['z', 'a', 'b', 'c', 'd'])
27print(queue.pop()) # 'd'
28print(queue.popleft()) # 'z'
29print(queue) # deque(['a', 'b', 'c'])

Comprehensions

python
1# List comprehension
2numbers = [1, 2, 3, 4, 5]
3squares = [x**2 for x in numbers]
4print(squares) # [1, 4, 9, 16, 25]
5
6# List comprehension with condition
7even_squares = [x**2 for x in numbers if x % 2 == 0]
8print(even_squares) # [4, 16]
9
10# Dictionary comprehension
11square_dict = {x: x**2 for x in numbers}
12print(square_dict) # {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
13
14# Set comprehension
15square_set = {x**2 for x in numbers}
16print(square_set) # {1, 4, 9, 16, 25}
17
18# Generator expression (memory efficient)
19sum_of_squares = sum(x**2 for x in numbers)
20print(sum_of_squares) # 55

Advanced Dictionary Operations

python
1# Merging dictionaries (Python 3.9+)
2dict1 = {'a': 1, 'b': 2}
3dict2 = {'b': 3, 'c': 4}
4merged = dict1 | dict2 # {'a': 1, 'b': 3, 'c': 4}
5print(merged)
6
7# Dictionary unpacking
8user = {'name': 'Alice', 'age': 30}
9defaults = {'country': 'USA', 'language': 'English'}
10full_profile = {**defaults, **user}
11print(full_profile) # {'country': 'USA', 'language': 'English', 'name': 'Alice', 'age': 30}
12
13# Dictionary views
14student = {'name': 'Bob', 'grades': [85, 90, 78]}
15keys = student.keys()
16values = student.values()
17items = student.items()
18
19print(keys) # dict_keys(['name', 'grades'])
20print(values) # dict_values(['Bob', [85, 90, 78]])
21print(items) # dict_items([('name', 'Bob'), ('grades', [85, 90, 78])])
22
23# Views are dynamic
24student['age'] = 20
25print(keys) # dict_keys(['name', 'grades', 'age'])

2. File Handling

Reading and Writing Text Files

python
1# Writing to a text file
2with open('example.txt', 'w') as file:
3 file.write('Hello, Python!
4')
5 file.write('File handling is essential.
6')
7
8# Reading from a text file
9with open('example.txt', 'r') as file:
10 content = file.read()
11 print(content)
12
13# Reading line by line
14with open('example.txt', 'r') as file:
15 for line in file:
16 print(line.strip()) # strip() removes the newline character
17
18# Appending to a file
19with open('example.txt', 'a') as file:
20 file.write('This line is appended.
21')

Working with CSV Files

python
1import csv
2
3# Writing CSV data
4data = [
5 ['Name', 'Age', 'Country'],
6 ['Alice', 30, 'USA'],
7 ['Bob', 25, 'Canada'],
8 ['Charlie', 35, 'UK']
9]
10
11with open('people.csv', 'w', newline='') as file:
12 writer = csv.writer(file)
13 writer.writerows(data)
14
15# Reading CSV data
16with open('people.csv', 'r', newline='') as file:
17 reader = csv.reader(file)
18 for row in reader:
19 print(row)
20
21# Using DictReader and DictWriter
22with open('people.csv', 'r', newline='') as file:
23 reader = csv.DictReader(file)
24 for row in reader:
25 print(f"{row['Name']} is {row['Age']} years old from {row['Country']}")

Working with JSON

python
1import json
2
3# Python dictionary
4user = {
5 'name': 'Alice',
6 'age': 30,
7 'is_active': True,
8 'skills': ['Python', 'JavaScript', 'SQL'],
9 'address': {
10 'city': 'New York',
11 'country': 'USA'
12 }
13}
14
15# Convert Python object to JSON string
16json_string = json.dumps(user, indent=4)
17print(json_string)
18
19# Write JSON to file
20with open('user.json', 'w') as file:
21 json.dump(user, file, indent=4)
22
23# Read JSON from file
24with open('user.json', 'r') as file:
25 loaded_user = json.load(file)
26 print(loaded_user)
27
28# Parse JSON string
29json_data = '{"name": "Bob", "age": 25}'
30parsed_data = json.loads(json_data)
31print(parsed_data['name']) # Bob

Working with Binary Files

python
1# Writing binary data
2with open('binary_file.bin', 'wb') as file:
3 file.write(b'\x48\x65\x6c\x6c\x6f') # "Hello" in hex
4
5# Reading binary data
6with open('binary_file.bin', 'rb') as file:
7 data = file.read()
8 print(data) # b'Hello'
9 print(data.decode('utf-8')) # Convert bytes to string: "Hello"

3. Exception Handling

Basic Exception Handling

python
1# Basic try-except
2try:
3 x = 10 / 0 # This will raise a ZeroDivisionError
4except ZeroDivisionError:
5 print("Cannot divide by zero!")
6
7# Handling multiple exceptions
8try:
9 num = int(input("Enter a number: "))
10 result = 10 / num
11 print(f"Result: {result}")
12except ValueError:
13 print("That's not a valid number!")
14except ZeroDivisionError:
15 print("Cannot divide by zero!")
16
17# Using else clause (runs if no exception occurs)
18try:
19 num = int(input("Enter a number: "))
20 result = 10 / num
21except (ValueError, ZeroDivisionError) as e:
22 print(f"Error: {e}")
23else:
24 print(f"Result: {result}")
25
26# Using finally clause (always runs)
27try:
28 file = open("example.txt", "r")
29 content = file.read()
30except FileNotFoundError:
31 print("File not found!")
32finally:
33 # This will run whether an exception occurred or not
34 print("Execution completed")
35 # Close the file if it was opened
36 if 'file' in locals() and not file.closed:
37 file.close()

Creating Custom Exceptions

python
1# Define custom exception
2class InsufficientFundsError(Exception):
3 """Raised when a withdrawal exceeds the available balance"""
4 def __init__(self, balance, amount):
5 self.balance = balance
6 self.amount = amount
7 self.message = f"Cannot withdraw $ {amount}. Only $ {balance} available."
8 super().__init__(self.message)
9
10# Using custom exception
11class BankAccount:
12 def __init__(self, balance=0):
13 self.balance = balance
14
15 def deposit(self, amount):
16 self.balance += amount
17 return self.balance
18
19 def withdraw(self, amount):
20 if amount > self.balance:
21 raise InsufficientFundsError(self.balance, amount)
22 self.balance -= amount
23 return self.balance
24
25# Test the custom exception
26account = BankAccount(100)
27try:
28 account.withdraw(150)
29except InsufficientFundsError as e:
30 print(e) # Cannot withdraw $150. Only $100 available.

Context Managers with try-finally

python
1# Context manager using class
2class FileManager:
3 def __init__(self, filename, mode):
4 self.filename = filename
5 self.mode = mode
6 self.file = None
7
8 def __enter__(self):
9 self.file = open(self.filename, self.mode)
10 return self.file
11
12 def __exit__(self, exc_type, exc_val, exc_tb):
13 if self.file:
14 self.file.close()
15 # Return True to suppress exception, False to propagate it
16 return False
17
18# Using the context manager
19with FileManager('example.txt', 'w') as file:
20 file.write('Using a custom context manager')
21
22# Context manager using decorator
23from contextlib import contextmanager
24
25@contextmanager
26def open_file(filename, mode):
27 try:
28 file = open(filename, mode)
29 yield file
30 finally:
31 file.close()
32
33# Using the decorator-based context manager
34with open_file('example.txt', 'r') as file:
35 content = file.read()
36 print(content)

4. Decorators and Generators

Decorators

Decorators are functions that modify the behavior of other functions or methods:

python
1# Basic decorator
2def my_decorator(func):
3 def wrapper():
4 print("Something is happening before the function is called.")
5 func()
6 print("Something is happening after the function is called.")
7 return wrapper
8
9@my_decorator
10def say_hello():
11 print("Hello!")
12
13say_hello()
14# Output:
15# Something is happening before the function is called.
16# Hello!
17# Something is happening after the function is called.
18
19# Decorator with arguments
20def repeat(n):
21 def decorator(func):
22 def wrapper(*args, **kwargs):
23 for _ in range(n):
24 result = func(*args, **kwargs)
25 return result
26 return wrapper
27 return decorator
28
29@repeat(3)
30def greet(name):
31 print(f"Hello, {name}!")
32
33greet("Alice")
34# Output:
35# Hello, Alice!
36# Hello, Alice!
37# Hello, Alice!
38
39# Practical example: timing decorator
40import time
41
42def timing_decorator(func):
43 def wrapper(*args, **kwargs):
44 start_time = time.time()
45 result = func(*args, **kwargs)
46 end_time = time.time()
47 print(f"{func.__name__} took {end_time - start_time:.4f} seconds to run")
48 return result
49 return wrapper
50
51@timing_decorator
52def slow_function():
53 time.sleep(1)
54 return "Function completed"
55
56print(slow_function())
57# Output:
58# slow_function took 1.0012 seconds to run
59# Function completed

Generators

Generators are functions that can pause and resume their execution, yielding values one at a time:

python
1# Basic generator
2def count_up_to(n):
3 i = 1
4 while i <= n:
5 yield i
6 i += 1
7
8# Using the generator
9counter = count_up_to(5)
10print(next(counter)) # 1
11print(next(counter)) # 2
12print(next(counter)) # 3
13
14# Iterating over a generator
15for num in count_up_to(5):
16 print(num) # Prints 1, 2, 3, 4, 5
17
18# Generator expressions
19squares = (x**2 for x in range(1, 6))
20print(list(squares)) # [1, 4, 9, 16, 25]
21
22# Memory efficiency example
23import sys
24
25# List comprehension (stores all values in memory)
26list_comp = [x**2 for x in range(10000)]
27# Generator expression (generates values on-the-fly)
28gen_exp = (x**2 for x in range(10000))
29
30print(f"List size: {sys.getsizeof(list_comp)} bytes")
31print(f"Generator size: {sys.getsizeof(gen_exp)} bytes")
32
33# Infinite sequence generator
34def fibonacci():
35 a, b = 0, 1
36 while True:
37 yield a
38 a, b = b, a + b
39
40# Get first 10 Fibonacci numbers
41fib = fibonacci()
42for _ in range(10):
43 print(next(fib), end=" ") # 0 1 1 2 3 5 8 13 21 34

Combining Decorators and Generators

python
1# Decorator for a generator
2def debug_generator(func):
3 def wrapper(*args, **kwargs):
4 gen = func(*args, **kwargs)
5 for value in gen:
6 print(f"Generator yielded: {value}")
7 yield value
8 return wrapper
9
10@debug_generator
11def numbers(n):
12 for i in range(n):
13 yield i * i
14
15# Using the decorated generator
16for num in numbers(5):
17 pass # The decorator will print each yielded value

5. Working with APIs

Python makes it easy to interact with web APIs using the requests library:

python
1import requests
2
3# Making a GET request
4response = requests.get('https://jsonplaceholder.typicode.com/posts/1')
5print(f"Status code: {response.status_code}")
6print(f"Content type: {response.headers['Content-Type']}")
7
8# Parsing JSON response
9data = response.json()
10print(f"Post title: {data['title']}")
11
12# Making a GET request with parameters
13params = {'userId': 1}
14response = requests.get('https://jsonplaceholder.typicode.com/posts', params=params)
15posts = response.json()
16print(f"Number of posts by user 1: {len(posts)}")
17
18# Making a POST request
19new_post = {
20 'title': 'Python API Tutorial',
21 'body': 'This is a post about using APIs with Python',
22 'userId': 1
23}
24response = requests.post('https://jsonplaceholder.typicode.com/posts', json=new_post)
25created_post = response.json()
26print(f"Created post ID: {created_post['id']}")
27
28# Making a PUT request (update)
29updated_post = {
30 'id': 1,
31 'title': 'Updated Title',
32 'body': 'This post has been updated',
33 'userId': 1
34}
35response = requests.put('https://jsonplaceholder.typicode.com/posts/1', json=updated_post)
36print(f"Update status: {response.status_code}")
37
38# Making a DELETE request
39response = requests.delete('https://jsonplaceholder.typicode.com/posts/1')
40print(f"Delete status: {response.status_code}")
41
42# Handling authentication
43response = requests.get(
44 'https://api.github.com/user',
45 auth=('username', 'personal_access_token')
46)
47
48# Using sessions for multiple requests
49session = requests.Session()
50session.headers.update({'User-Agent': 'Python API Tutorial'})
51response = session.get('https://api.github.com/repos/python/cpython')
52print(response.json()['description'])

Async API Requests

python
1import asyncio
2import aiohttp
3import time
4
5async def fetch_data(session, url):
6 async with session.get(url) as response:
7 return await response.json()
8
9async def main():
10 start_time = time.time()
11
12 # Create a session
13 async with aiohttp.ClientSession() as session:
14 # Create tasks for concurrent execution
15 tasks = []
16 for i in range(1, 11):
17 url = f'https://jsonplaceholder.typicode.com/posts/{i}'
18 tasks.append(fetch_data(session, url))
19
20 # Wait for all tasks to complete
21 results = await asyncio.gather(*tasks)
22
23 # Process results
24 for i, result in enumerate(results, 1):
25 print(f"Post {i} title: {result['title'][:30]}...")
26
27 end_time = time.time()
28 print(f"Fetched 10 posts in {end_time - start_time:.2f} seconds")
29
30# Run the async function
31asyncio.run(main())

6. Data Analysis with Pandas

Pandas is a powerful library for data manipulation and analysis:

python
1import pandas as pd
2import numpy as np
3import matplotlib.pyplot as plt
4
5# Creating a DataFrame
6data = {
7 'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eva'],
8 'Age': [25, 30, 35, 40, 45],
9 'City': ['New York', 'Boston', 'Chicago', 'Boston', 'New York'],
10 'Salary': [50000, 60000, 70000, 80000, 90000]
11}
12
13df = pd.DataFrame(data)
14print(df)
15
16# Basic DataFrame operations
17print(df.head(2)) # First 2 rows
18print(df.tail(2)) # Last 2 rows
19print(df.info()) # DataFrame info
20print(df.describe()) # Statistical summary
21
22# Selecting data
23print(df['Name']) # Select a column
24print(df[['Name', 'Age']]) # Select multiple columns
25print(df.loc[0]) # Select a row by label
26print(df.iloc[0]) # Select a row by position
27print(df.loc[0:2, 'Name':'City']) # Select rows and columns by label
28print(df.iloc[0:2, 0:2]) # Select rows and columns by position
29
30# Filtering data
31print(df[df['Age'] > 30]) # Filter by condition
32print(df[(df['Age'] > 30) & (df['Salary'] > 70000)]) # Multiple conditions
33
34# Adding and modifying columns
35df['Experience'] = [3, 5, 8, 12, 15] # Add a new column
36df['Salary'] = df['Salary'] * 1.1 # Modify a column
37print(df)
38
39# Grouping and aggregation
40city_groups = df.groupby('City')
41print(city_groups.mean()) # Mean of numeric columns by city
42print(city_groups.agg({
43 'Age': 'mean',
44 'Salary': ['min', 'max', 'mean'],
45 'Experience': 'sum'
46}))
47
48# Handling missing values
49df_missing = df.copy()
50df_missing.loc[0, 'Age'] = np.nan
51df_missing.loc[2, 'Salary'] = np.nan
52
53print(df_missing.isna().sum()) # Count missing values
54print(df_missing.fillna(0)) # Fill missing values with 0
55print(df_missing.dropna()) # Drop rows with missing values
56
57# Merging DataFrames
58df1 = pd.DataFrame({
59 'ID': [1, 2, 3, 4],
60 'Name': ['Alice', 'Bob', 'Charlie', 'David'],
61 'Department': ['HR', 'IT', 'Finance', 'IT']
62})
63
64df2 = pd.DataFrame({
65 'ID': [1, 2, 3, 5],
66 'Salary': [50000, 60000, 70000, 90000],
67 'Experience': [3, 5, 8, 15]
68})
69
70# Inner join
71merged_inner = pd.merge(df1, df2, on='ID', how='inner')
72print(merged_inner)
73
74# Outer join
75merged_outer = pd.merge(df1, df2, on='ID', how='outer')
76print(merged_outer)
77
78# Basic visualization
79df.plot(kind='bar', x='Name', y='Salary', figsize=(10, 6))
80plt.title('Salary by Employee')
81plt.ylabel('Salary ($)')
82plt.tight_layout()
83plt.show()

Time Series Analysis

python
1# Create a time series
2dates = pd.date_range('20230101', periods=12, freq='M')
3ts = pd.Series(np.random.randn(12) * 100 + 500, index=dates)
4print(ts)
5
6# Resampling
7print(ts.resample('Q').mean()) # Quarterly average
8print(ts.resample('Q').agg(['min', 'max', 'mean'])) # Multiple aggregations
9
10# Rolling statistics
11print(ts.rolling(window=3).mean()) # 3-month rolling average
12
13# Shifting
14print(ts.shift(1)) # Shift values forward by 1
15print(ts.diff()) # Difference between current and previous value
16
17# Time series visualization
18ts.plot(figsize=(10, 6))
19ts.rolling(window=3).mean().plot(style='r--')
20plt.title('Monthly Values with 3-Month Rolling Average')
21plt.ylabel('Value')
22plt.legend(['Original', '3-Month Avg'])
23plt.tight_layout()
24plt.show()

7. Web Development with Flask

Flask is a lightweight web framework for Python:

python
1from flask import Flask, render_template, request, redirect, url_for, jsonify
2
3# Create a Flask application
4app = Flask(__name__)
5
6# Sample data
7tasks = [
8 {'id': 1, 'title': 'Learn Python', 'done': True},
9 {'id': 2, 'title': 'Learn Flask', 'done': False},
10 {'id': 3, 'title': 'Build a web app', 'done': False}
11]
12
13# Route for the home page
14@app.route('/')
15def index():
16 return render_template('index.html', tasks=tasks)
17
18# Route to get all tasks (API)
19@app.route('/api/tasks', methods=['GET'])
20def get_tasks():
21 return jsonify({'tasks': tasks})
22
23# Route to get a specific task (API)
24@app.route('/api/tasks/<int:task_id>', methods=['GET'])
25def get_task(task_id):
26 task = next((task for task in tasks if task['id'] == task_id), None)
27 if task:
28 return jsonify({'task': task})
29 return jsonify({'error': 'Task not found'}), 404
30
31# Route to create a new task (API)
32@app.route('/api/tasks', methods=['POST'])
33def create_task():
34 if not request.json or 'title' not in request.json:
35 return jsonify({'error': 'Title is required'}), 400
36
37 task_id = max(task['id'] for task in tasks) + 1
38 task = {
39 'id': task_id,
40 'title': request.json['title'],
41 'done': False
42 }
43 tasks.append(task)
44 return jsonify({'task': task}), 201
45
46# Route to update a task (API)
47@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
48def update_task(task_id):
49 task = next((task for task in tasks if task['id'] == task_id), None)
50 if not task:
51 return jsonify({'error': 'Task not found'}), 404
52
53 if 'title' in request.json:
54 task['title'] = request.json['title']
55 if 'done' in request.json:
56 task['done'] = request.json['done']
57
58 return jsonify({'task': task})
59
60# Route to delete a task (API)
61@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
62def delete_task(task_id):
63 task = next((task for task in tasks if task['id'] == task_id), None)
64 if not task:
65 return jsonify({'error': 'Task not found'}), 404
66
67 tasks.remove(task)
68 return jsonify({'result': True})
69
70# Form submission route
71@app.route('/add', methods=['POST'])
72def add_task():
73 title = request.form.get('title')
74 if title:
75 task_id = max(task['id'] for task in tasks) + 1
76 tasks.append({'id': task_id, 'title': title, 'done': False})
77 return redirect(url_for('index'))
78
79# Toggle task status route
80@app.route('/toggle/<int:task_id>')
81def toggle_task(task_id):
82 task = next((task for task in tasks if task['id'] == task_id), None)
83 if task:
84 task['done'] = not task['done']
85 return redirect(url_for('index'))
86
87# Delete task route
88@app.route('/delete/<int:task_id>')
89def remove_task(task_id):
90 task = next((task for task in tasks if task['id'] == task_id), None)
91 if task:
92 tasks.remove(task)
93 return redirect(url_for('index'))
94
95if __name__ == '__main__':
96 app.run(debug=True)

HTML Template Example

Here's a simple HTML template for the Flask app (save as templates/index.html):

html
1<!DOCTYPE html>
2<html lang="en">
3<head>
4 <meta charset="UTF-8">
5 <meta name="viewport" content="width=device-width, initial-scale=1.0">
6 <title>Flask Todo App</title>
7 <style>
8 body {
9 font-family: Arial, sans-serif;
10 max-width: 800px;
11 margin: 0 auto;
12 padding: 20px;
13 }
14 .task {
15 display: flex;
16 align-items: center;
17 padding: 10px;
18 border-bottom: 1px solid #eee;
19 }
20 .task.done {
21 text-decoration: line-through;
22 color: #888;
23 }
24 .task-actions {
25 margin-left: auto;
26 }
27 form {
28 display: flex;
29 margin-bottom: 20px;
30 }
31 input[type="text"] {
32 flex-grow: 1;
33 padding: 10px;
34 border: 1px solid #ddd;
35 border-radius: 4px 0 0 4px;
36 }
37 button {
38 padding: 10px 15px;
39 background-color: #4CAF50;
40 color: white;
41 border: none;
42 border-radius: 0 4px 4px 0;
43 cursor: pointer;
44 }
45 .task-actions a {
46 margin-left: 10px;
47 color: #666;
48 text-decoration: none;
49 }
50 </style>
51</head>
52<body>
53 <h1>Todo List</h1>
54
55 <form action="/add" method="post">
56 <input type="text" name="title" placeholder="Add a new task..." required>
57 <button type="submit">Add</button>
58 </form>
59
60 <div class="tasks">
61 {% for task in tasks %}
62 <div class="task {% if task.done %}done{% endif %}">
63 <span>{{ task.title }}</span>
64 <div class="task-actions">
65 <a href="/toggle/{{ task.id }}">{% if task.done %}Undo{% else %}Done{% endif %}</a>
66 <a href="/delete/{{ task.id }}">Delete</a>
67 </div>
68 </div>
69 {% endfor %}
70 </div>
71</body>
72</html>

8. Testing in Python

Unit Testing with unittest

Python's built-in unittest framework allows you to write and run tests:

python
1import unittest
2
3# The code to test
4def add(a, b):
5 return a + b
6
7def subtract(a, b):
8 return a - b
9
10def multiply(a, b):
11 return a * b
12
13def divide(a, b):
14 if b == 0:
15 raise ValueError("Cannot divide by zero")
16 return a / b
17
18# Test case class
19class TestMathFunctions(unittest.TestCase):
20
21 def test_add(self):
22 self.assertEqual(add(10, 5), 15)
23 self.assertEqual(add(-1, 1), 0)
24 self.assertEqual(add(-1, -1), -2)
25
26 def test_subtract(self):
27 self.assertEqual(subtract(10, 5), 5)
28 self.assertEqual(subtract(-1, 1), -2)
29 self.assertEqual(subtract(-1, -1), 0)
30
31 def test_multiply(self):
32 self.assertEqual(multiply(10, 5), 50)
33 self.assertEqual(multiply(-1, 1), -1)
34 self.assertEqual(multiply(-1, -1), 1)
35
36 def test_divide(self):
37 self.assertEqual(divide(10, 5), 2)
38 self.assertEqual(divide(-1, 1), -1)
39 self.assertEqual(divide(5, 2), 2.5)
40
41 # Test division by zero
42 with self.assertRaises(ValueError):
43 divide(10, 0)
44
45 # Setup and teardown methods
46 def setUp(self):
47 # Code to run before each test
48 print("Setting up test...")
49
50 def tearDown(self):
51 # Code to run after each test
52 print("Tearing down test...")
53
54 @classmethod
55 def setUpClass(cls):
56 # Code to run once before all tests
57 print("Setting up test class...")
58
59 @classmethod
60 def tearDownClass(cls):
61 # Code to run once after all tests
62 print("Tearing down test class...")
63
64if __name__ == '__main__':
65 unittest.main()

Testing with pytest

pytest is a more modern testing framework with simpler syntax:

python
1# Save as test_math.py
2import pytest
3
4# The code to test
5def add(a, b):
6 return a + b
7
8def divide(a, b):
9 if b == 0:
10 raise ValueError("Cannot divide by zero")
11 return a / b
12
13# Test functions
14def test_add():
15 assert add(10, 5) == 15
16 assert add(-1, 1) == 0
17 assert add(-1, -1) == -2
18
19def test_divide():
20 assert divide(10, 5) == 2
21 assert divide(-1, 1) == -1
22 assert divide(5, 2) == 2.5
23
24def test_divide_by_zero():
25 with pytest.raises(ValueError):
26 divide(10, 0)
27
28# Parameterized tests
29@pytest.mark.parametrize("a, b, expected", [
30 (10, 5, 15),
31 (-1, 1, 0),
32 (-1, -1, -2),
33 (0, 0, 0)
34])
35def test_add_parameterized(a, b, expected):
36 assert add(a, b) == expected
37
38# Fixtures
39@pytest.fixture
40def sample_data():
41 return [1, 2, 3, 4, 5]
42
43def test_with_fixture(sample_data):
44 assert len(sample_data) == 5
45 assert sum(sample_data) == 15

Mock Objects

Using mock objects to isolate code for testing:

python
1import unittest
2from unittest.mock import Mock, patch
3
4# Function that makes an API call
5def get_user_data(user_id):
6 # In a real app, this would call an API
7 response = make_api_request(f"https://api.example.com/users/{user_id}")
8 if response.status_code == 200:
9 return response.json()
10 return None
11
12# Function that processes user data
13def process_user(user_id):
14 user_data = get_user_data(user_id)
15 if user_data and user_data.get('active'):
16 return f"Active user: {user_data['name']}"
17 return "Inactive or missing user"
18
19# Test case using mocks
20class TestUserProcessing(unittest.TestCase):
21
22 def test_process_active_user(self):
23 # Create a mock for get_user_data
24 mock_get_data = Mock(return_value={
25 'id': 123,
26 'name': 'Test User',
27 'active': True
28 })
29
30 # Patch the get_user_data function
31 with patch('__main__.get_user_data', mock_get_data):
32 result = process_user(123)
33 self.assertEqual(result, "Active user: Test User")
34 mock_get_data.assert_called_once_with(123)
35
36 def test_process_inactive_user(self):
37 # Create a mock for get_user_data
38 mock_get_data = Mock(return_value={
39 'id': 456,
40 'name': 'Inactive User',
41 'active': False
42 })
43
44 # Patch the get_user_data function
45 with patch('__main__.get_user_data', mock_get_data):
46 result = process_user(456)
47 self.assertEqual(result, "Inactive or missing user")
48
49 def test_process_missing_user(self):
50 # Create a mock for get_user_data
51 mock_get_data = Mock(return_value=None)
52
53 # Patch the get_user_data function
54 with patch('__main__.get_user_data', mock_get_data):
55 result = process_user(789)
56 self.assertEqual(result, "Inactive or missing user")

9. Concurrency and Parallelism

Threading

python
1import threading
2import time
3
4def worker(name, delay):
5 print(f"{name} started")
6 time.sleep(delay)
7 print(f"{name} finished")
8
9# Create threads
10thread1 = threading.Thread(target=worker, args=("Thread-1", 2))
11thread2 = threading.Thread(target=worker, args=("Thread-2", 4))
12
13# Start threads
14start_time = time.time()
15thread1.start()
16thread2.start()
17
18# Wait for threads to complete
19thread1.join()
20thread2.join()
21
22end_time = time.time()
23print(f"All threads completed in {end_time - start_time:.2f} seconds")

Multiprocessing

python
1import multiprocessing
2import time
3
4def cpu_bound_task(number):
5 """A CPU-bound task that computes the sum of squares."""
6 return sum(i * i for i in range(number))
7
8def process_worker(numbers):
9 """Process a list of numbers using a single process."""
10 start_time = time.time()
11 results = [cpu_bound_task(number) for number in numbers]
12 end_time = time.time()
13 print(f"Sequential processing took {end_time - start_time:.2f} seconds")
14 return results
15
16def process_worker_parallel(numbers):
17 """Process a list of numbers using multiple processes."""
18 start_time = time.time()
19
20 # Create a pool of processes
21 with multiprocessing.Pool() as pool:
22 # Map the function to the list of numbers
23 results = pool.map(cpu_bound_task, numbers)
24
25 end_time = time.time()
26 print(f"Parallel processing took {end_time - start_time:.2f} seconds")
27 return results
28
29if __name__ == "__main__":
30 # List of numbers to process
31 numbers = [10000000, 20000000, 30000000, 40000000]
32
33 # Process sequentially
34 sequential_results = process_worker(numbers)
35
36 # Process in parallel
37 parallel_results = process_worker_parallel(numbers)
38
39 # Verify results are the same
40 print(f"Results match: {sequential_results == parallel_results}")

Asyncio

python
1import asyncio
2import time
3
4async def async_task(name, delay):
5 print(f"{name} started at {time.strftime('%X')}")
6 await asyncio.sleep(delay) # Non-blocking sleep
7 print(f"{name} finished at {time.strftime('%X')}")
8 return f"{name} completed"
9
10async def main():
11 # Create tasks
12 task1 = asyncio.create_task(async_task("Task-1", 2))
13 task2 = asyncio.create_task(async_task("Task-2", 3))
14 task3 = asyncio.create_task(async_task("Task-3", 1))
15
16 print(f"Started at {time.strftime('%X')}")
17
18 # Wait for all tasks to complete
19 results = await asyncio.gather(task1, task2, task3)
20
21 print(f"Finished at {time.strftime('%X')}")
22 print(f"Results: {results}")
23
24# Run the async program
25asyncio.run(main())

Combining Asyncio with Multiprocessing

python
1import asyncio
2import concurrent.futures
3import time
4
5def cpu_bound(number):
6 """CPU-bound task: find sum of squares."""
7 return sum(i * i for i in range(number))
8
9async def cpu_bound_async(number):
10 """Run CPU-bound task in a thread pool."""
11 loop = asyncio.get_running_loop()
12 with concurrent.futures.ProcessPoolExecutor() as pool:
13 return await loop.run_in_executor(pool, cpu_bound, number)
14
15async def io_bound(delay):
16 """I/O-bound task: just wait."""
17 print(f"IO task started, waiting for {delay} seconds")
18 await asyncio.sleep(delay)
19 print(f"IO task finished after {delay} seconds")
20 return f"IO result after {delay}s"
21
22async def main():
23 start_time = time.time()
24
25 # Combine CPU-bound and I/O-bound tasks
26 cpu_tasks = [cpu_bound_async(num) for num in [10000000, 20000000, 30000000]]
27 io_tasks = [io_bound(delay) for delay in [1, 2, 3]]
28
29 # Run all tasks concurrently
30 all_results = await asyncio.gather(*(cpu_tasks + io_tasks))
31
32 end_time = time.time()
33 print(f"All tasks completed in {end_time - start_time:.2f} seconds")
34 print(f"Results: {all_results}")
35
36if __name__ == "__main__":
37 asyncio.run(main())