1
Python Database Programming in Action: From Single Thread to High Concurrency, A Complete Guide to Database Operations
Python database connection pool, concurrent database control, deadlock prevention, transaction management, database security

2024-11-12

Opening Thoughts

Have you ever wondered: Why does my database code always have issues? Everything works fine in local testing, but why do all sorts of exceptions occur in production? How should I configure the database connection pool? How should I manage transactions? Today, let's discuss these questions together.

As a Python developer, I deeply understand the importance of database programming. I remember when I first started learning database programming, I stepped into quite a few pitfalls. For instance, I used to forget to close database connections, which quickly exhausted the connection pool; sometimes I wrote a simple query but caused data inconsistency due to improper transaction handling. These experiences made me realize that mastering database programming involves much more than just writing a few SQL statements.

Basic Knowledge

Before we start coding, let's understand several key concepts. A database connection is the bridge between an application and a database, while a cursor is the tool for executing SQL statements. It's like going to a library - the connection is like your library card, and the cursor is like the bookmarks you place in books.

Single-Thread Operations

Let's start with the simplest single-thread environment. In a single-thread environment, everything is simple, and you don't need to worry about resource competition. Let's look at the most basic database operation code:

import sqlite3

def init_database():
    # Create database connection
    conn = sqlite3.connect('library.db')
    cursor = conn.cursor()

    # Create books table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS books (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            author TEXT NOT NULL,
            price REAL,
            stock INTEGER DEFAULT 0
        )
    ''')

    # Create users table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT UNIQUE NOT NULL,
            balance REAL DEFAULT 0.0
        )
    ''')

    conn.commit()
    return conn

This code creates a database structure for a simple library management system. We created two tables: the books table stores book information, and the users table stores user information. Each book has title, author, price, and stock information, and each user has a username and account balance.

Basic CRUD Implementation

With the database structure in place, let's implement basic CRUD operations:

class BookManager:
    def __init__(self, conn):
        self.conn = conn
        self.cursor = conn.cursor()

    def add_book(self, title, author, price, stock):
        try:
            self.cursor.execute('''
                INSERT INTO books (title, author, price, stock)
                VALUES (?, ?, ?, ?)
            ''', (title, author, price, stock))
            self.conn.commit()
            return True
        except sqlite3.Error as e:
            print(f"Failed to add book: {e}")
            self.conn.rollback()
            return False

    def get_book(self, book_id):
        try:
            self.cursor.execute('SELECT * FROM books WHERE id = ?', (book_id,))
            return self.cursor.fetchone()
        except sqlite3.Error as e:
            print(f"Failed to query book: {e}")
            return None

This BookManager class encapsulates basic book operations. Notice how I handle exceptions - exception handling is crucial in production environments. Each operation can fail, and we need to handle these failures gracefully.

Advanced Operations

Connection Pool Implementation

When our application needs to handle many concurrent requests, a single database connection isn't enough. This is where connection pools come in:

import queue
import threading
from contextlib import contextmanager

class DatabasePool:
    def __init__(self, max_connections=5):
        self.max_connections = max_connections
        self.connections = queue.Queue(max_connections)
        self.lock = threading.Lock()

        # Initialize connection pool
        for _ in range(max_connections):
            conn = sqlite3.connect('library.db', check_same_thread=False)
            self.connections.put(conn)

    @contextmanager
    def get_connection(self):
        connection = self.connections.get()
        try:
            yield connection
        finally:
            self.connections.put(connection)

    def close_all(self):
        while not self.connections.empty():
            conn = self.connections.get()
            conn.close()

This connection pool implementation has several key points:

  1. Using Queue to manage connections
  2. Using context manager to automatically manage connection acquisition and return
  3. Implementing a graceful shutdown mechanism

Transaction Management

Transaction management becomes particularly important when handling complex business logic. For example, in a book purchase scenario:

class BookStore:
    def __init__(self, pool):
        self.pool = pool

    def purchase_book(self, user_id, book_id, quantity):
        with self.pool.get_connection() as conn:
            cursor = conn.cursor()
            try:
                # Start transaction
                cursor.execute('BEGIN TRANSACTION')

                # Check book stock
                cursor.execute('SELECT price, stock FROM books WHERE id = ?', (book_id,))
                book_info = cursor.fetchone()
                if not book_info or book_info[1] < quantity:
                    raise ValueError("Insufficient stock")

                total_price = book_info[0] * quantity

                # Check user balance
                cursor.execute('SELECT balance FROM users WHERE id = ?', (user_id,))
                user_balance = cursor.fetchone()[0]
                if user_balance < total_price:
                    raise ValueError("Insufficient balance")

                # Update stock
                cursor.execute('''
                    UPDATE books 
                    SET stock = stock - ? 
                    WHERE id = ?
                ''', (quantity, book_id))

                # Update user balance
                cursor.execute('''
                    UPDATE users 
                    SET balance = balance - ? 
                    WHERE id = ?
                ''', (total_price, user_id))

                # Commit transaction
                conn.commit()
                return True

            except Exception as e:
                conn.rollback()
                print(f"Purchase failed: {e}")
                return False