Python 程式設計/資料庫
Python 透過簡單的 API 支援與資料庫互動。Python 包含的模組包括用於 SQLite 和 Berkeley DB 的模組。用於 MySQL 、PostgreSQL 、FirebirdSQL 等其他資料庫的模組作為第三方模組提供。後者需要在使用前下載並安裝。例如,可以使用 Debian 軟體包“python-mysqldb”安裝包 MySQLdb。
使用 MySQL 的示例如下所示
import MySQLdb
db = MySQLdb.connect("host machine", "dbuser", "password", "dbname")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()
在第一行, 模組MySQLdb被匯入。然後建立與資料庫的連線,在第 4 行,我們將要執行的實際 SQL 語句儲存到變數query中。在第 5 行,我們執行查詢,在第 6 行,我們獲取所有資料。執行完這段程式碼後,lines包含獲取的行的數量(例如,表中的行數sampletable)。變數data包含所有實際資料,例如,sampletable的內容。最後,將再次關閉與資料庫的連線。如果行的數量很大,最好使用row = cursor.fetchone()並分別處理行
#first 5 lines are the same as above
while True:
row = cursor.fetchone()
if row == None: break
#do something with this row of data
db.close()
顯然,必須對行使用某種資料處理,否則資料將不會被儲存。 的結果fetchone()命令是一個 元組。
為了使連線的初始化更容易,可以使用配置檔案
import MySQLdb
db = MySQLdb.connect(read_default_file="~/.my.cnf")
...
這裡,主目錄中的 .my.cnf 檔案包含 MySQL 所需的配置資訊。
使用 SQLite 的示例與上面的示例非常相似,並且遊標提供了許多相同的功能。
import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()
寫入資料庫時,必須記住呼叫 db.commit(),否則更改不會儲存
import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """INSERT INTO sampletable (value1, value2) VALUES (1,'test')"""
cursor.execute(query)
db.commit()
db.close()
import psycopg2
conn = psycopg2.connect("dbname=test")
cursor = conn.cursor()
cursor.execute("select * from test");
for i in cursor.next():
print(i)
conn.close()
import firebirdsql
conn = firebirdsql.connect(dsn='localhost/3050:/var/lib/firebird/2.5/test.fdb', user='alice', password='wonderland')
cur = conn.cursor()
cur.execute("select * from baz")
for c in cur.fetchall():
print(c)
conn.close()
您經常需要將動態資料替換到查詢字串中。確保正確完成此操作很重要。
# Do not do this!
result = db.execute("SELECT name FROM employees WHERE location = '" + location + "'")
此示例錯誤,因為它沒有正確處理被替換字串中的特殊字元,如撇號。如果您的程式碼必須處理潛在的惡意使用者(例如在面向公眾的 Web 伺服器上),這可能會使您面臨SQL 注入攻擊的風險。
對於簡單的情況,請使用 execute 方法提供的自動引數替換,例如
result = db.execute("SELECT name FROM employees WHERE location = ?", [location])
DBMS 介面本身會自動將您傳入的值轉換為正確的 SQL 語法。
對於更復雜的情況,DBMS 模組應提供一個您可以明確呼叫的引用函式。例如,MySQLdb 提供 escape_string 方法,而 APSW(用於 SQLite3)提供 format_sql_value。當查詢結構採用更動態的形式時,這是必需的
criteria = [("company", company)] # list of tuples (fieldname, value)
if department != None :
criteria.append(("department", department))
# ... append other optional criteria as appropriate ...
result = db.execute(
"SELECT name FROM employees WHERE "
+
" and ".join(
"%s = %s" % (criterion[0], MySQLdb.escape_string(criterion[1]))
for criterion in criteria
)
)
這將動態構建查詢,例如“select name from employees where company = 'some company'”或“select name from employees where company = 'some company' and department = 'some department'”,具體取決於使用者填寫了哪些欄位。
Python 迭代器非常適合迭代大量資料庫記錄的問題。以下是一個函式的示例,它執行資料庫查詢並返回結果的迭代器,而不是一次返回所有結果。它依賴於這樣一個事實:在 APSW(SQLite 的 Python 3 介面庫)中,cursor.execute 方法本身返回結果記錄的迭代器。結果是,您可以用非常簡潔的程式碼在 Python 中執行復雜的資料庫查詢。
def db_iter(db, cmd, mapfn = lambda x : x) :
"executes cmd on a new cursor from connection db and yields the results in turn."
cu = db.cursor()
result = cu.execute(cmd)
while True:
yield mapfn(next(result))
此函式的示例用法
for artist, publisher in db_iter(
db = db,
cmd =
"SELECT artist, publisher FROM artists WHERE location = %s"
%
apsw.format_sql_value(location)
):
print(artist, publisher)
和
for location in db_iter(
db = db,
cmd = "SELECT DISTINCT location FROM artists",
mapfn = lambda x : x[0]
):
print(location)
在第一個示例中,由於 db_iter 為每個記錄返回一個元組,因此可以將其直接分配給記錄欄位的各個變數。在第二個示例中,元組只有一個元素,因此使用自定義 mapfn 來提取此元素並返回它,而不是元組。
資料庫表定義經常會發生變化。隨著應用程式需求的演變,欄位甚至整個表經常被新增,或者有時被刪除。考慮以下語句
result = db.execute("select * from employees")
您可能碰巧知道employees表當前包含例如 4 個欄位。但是明天有人可能會新增第五個欄位。您是否記得更新您的程式碼以處理這種情況?如果沒有,它可能會崩潰。或者更糟,產生錯誤的結果!
最好始終列出您感興趣的特定欄位,無論有多少欄位
result = db.execute("select name, address, department, location from employees")
這樣,任何新增的額外欄位都將被忽略。如果刪除了任何命名欄位,程式碼至少會以執行時錯誤失敗,這提醒您忘記更新它!
考慮以下場景:您的銷售公司資料庫有一個員工表,還有一個每個員工銷售記錄的表。您想遍歷這些銷售條目,並生成一些每個員工的統計資料。一種天真的方法可能是
- 查詢資料庫以獲取員工列表
- 對於每個員工,查詢資料庫以獲取每個員工的銷售列表。
如果您有很多員工,那麼第一個查詢可能會生成一個很大的列表,而第二步將涉及相應數量的資料庫查詢。
事實上,整個處理迴圈可以從一個單個數據庫查詢中執行,使用標準 SQL 結構稱為 join。
以下是一個此類迴圈的示例
rows = db_iter \
(
db = db,
cmd =
"select employees.name, sales.amount, sales.date from"
" employees left join sales on employees.id = sales.employee_id"
" order by employees.name, sales.date"
)
prev_employee_name = None
while True:
row = next(rows, None)
if row != None :
employee_name, amount, date = row
if row == None or employee_name != prev_employee_name :
if prev_employee_name != None :
# done stats for this employee
report(prev_employee_name, employee_stats)
if row == None :
break
# start stats for a new employee
prev_employee_name = employee_name
employee_stats = {"total_sales" : 0, "number_of_sales" : 0}
if date != None :
employee_stats["earliest_sale"] = date
# another row of stats for this employee
if amount != None :
employee_stats["total_sales"] += amount
employee_stats["number_of_sales"] += 1
if date != None :
employee_stats["latest_sale"] = date
這裡的統計資訊非常簡單:最早和最新的銷售、銷售數量和總額,並且可以直接在 SQL 查詢中計算。但是,相同的迴圈可以計算更復雜的統計資訊(例如標準偏差),這些統計資訊無法直接在簡單的 SQL 查詢中表示。
注意每個員工的統計資訊是如何在兩種條件之一下寫出的
- 下一條記錄的員工姓名與上一條記錄不同
- 已達到查詢結果的末尾。
這兩個條件都使用row == None or employee_name != prev_employee_name進行測試;在寫出員工統計資料後,使用單獨的檢查第二個條件row == None來終止迴圈。如果迴圈沒有終止,則為新員工初始化處理。
還要注意在這種情況下使用left join:如果員工沒有銷售額,則連線將為該員工返回一行,其中 SQL null 值(在 Python 中表示為None)用於來自銷售表的欄位。這就是為什麼在處理這些欄位之前需要檢查這些None值。
或者,我們可以使用inner join,它將對沒有銷售額的員工返回沒有結果。是否要從報表中省略此類員工,或者將他們包含在內,總計為零,這實際上取決於您的應用程式。
- APSW 模組,code.google.com — Python 2.x 和 3.x 的 SQLite3
- SQLite 文件
- Psycopg2(PostgreSQL 模組 - 更新版),initd.org
- PyGreSQL(PostgreSQL 模組 - 舊版),pygresql.org
- MySQL 模組,sourceforge.net
- FirebirdSQL 模組,github.com