using System; using System.Collections.Generic; using System.Data.SQLite; using System.Linq; using System.Text; using System.Threading.Tasks; namespace AppLiteSql { publicclassDatabaseManager { // 创建一个静态锁对象,确保线程同步 privatestatic readonly object _lock = new object(); // 数据库连接字符串 privatestring _connectionString; publicDatabaseManager(string dbPath) { _connectionString = $"Data Source={dbPath};Version=3;"; } // 线程安全的插入方法 publicvoidThreadSafeInsert(string name, int age) { // 使用锁确保同步 lock (_lock) { using (var connection = new SQLiteConnection(_connectionString)) { connection.Open(); using (var command = new SQLiteCommand(connection)) { command.CommandText = @" INSERT INTO Users (Name, Age) VALUES (@Name, @Age)"; command.Parameters.AddWithValue("@Name", name); command.Parameters.AddWithValue("@Age", age); command.ExecuteNonQuery(); } } } } // 线程安全的查询方法 publicintGetUserCount() { lock (_lock) { using (var connection = new SQLiteConnection(_connectionString)) { connection.Open(); using (var command = new SQLiteCommand("SELECT COUNT(*) FROM Users", connection)) { return Convert.ToInt32(command.ExecuteScalar()); } } } } } }
namespace AppLiteSql { internal classProgram { staticvoidMain(string[] args) { DatabaseManager dbManager = new DatabaseManager("D:\\myproject\\11Test\\AppLiteSql\\db"); // 创建多个线程并发插入数据 var threads = new List<Thread>(); for (int i = 0; i < 10; i++) { int threadId = i; var thread = new Thread(() => { for (int j = 0; j < 100; j++) { dbManager.ThreadSafeInsert($"User_{threadId}_{j}", 30 + threadId); } }); threads.Add(thread); thread.Start(); } // 等待所有线程完成 foreach (var thread in threads) { thread.Join(); } // 验证插入结果 int totalUsers = dbManager.GetUserCount(); Console.WriteLine($"Total Users: {totalUsers}"); } } }
高级并发控制策略
信号量控制数据库连接池
using System; using System.Collections.Generic; using System.Data.SQLite; using System.Linq; using System.Text; using System.Threading.Tasks; namespace AppLiteSql { publicclassAdvancedDatabaseManager { // 使用信号量控制并发连接数 private readonly SemaphoreSlim _connectionSemaphore; privatestring _connectionString; publicAdvancedDatabaseManager(string dbPath, int maxConcurrentConnections = 5) { _connectionString = $"Data Source={dbPath};Version=3;"; _connectionSemaphore = new SemaphoreSlim(maxConcurrentConnections); } // 异步并发查询方法 public async Task<int> ConcurrentQueryAsync(string query) { await _connectionSemaphore.WaitAsync(); try { using (var connection = new SQLiteConnection(_connectionString)) { await connection.OpenAsync(); using (var command = new SQLiteCommand(query, connection)) { return Convert.ToInt32(await command.ExecuteScalarAsync()); } } } finally { _connectionSemaphore.Release(); } } // 异步读取多行数据的方法 public async Task<List<User>> ReadUsersAsync(string condition = null) { await _connectionSemaphore.WaitAsync(); try { using (var connection = new SQLiteConnection(_connectionString)) { await connection.OpenAsync(); string query = "SELECT Id, Name, Age FROM Users"; if (!string.IsNullOrEmpty(condition)) { query += $" WHERE {condition}"; } using (var command = new SQLiteCommand(query, connection)) { var users = new List<User>(); using (var reader = await command.ExecuteReaderAsync()) { while (await reader.ReadAsync()) { users.Add(new User { Id = reader.GetInt32(0), Name = reader.GetString(1), Age = reader.GetInt32(2) }); } } return users; } } } finally { _connectionSemaphore.Release(); } } // 异步写入数据的方法 public async Task<int> WriteUserAsync(User user) { await _connectionSemaphore.WaitAsync(); try { using (var connection = new SQLiteConnection(_connectionString)) { await connection.OpenAsync(); using (var command = new SQLiteCommand(connection)) { command.CommandText = @" INSERT INTO Users (Name, Age) VALUES (@Name, @Age); SELECT last_insert_rowid();"; command.Parameters.AddWithValue("@Name", user.Name); command.Parameters.AddWithValue("@Age", user.Age); return Convert.ToInt32(await command.ExecuteScalarAsync()); } } } finally { _connectionSemaphore.Release(); } } // 异步批量写入数据的方法 public async Task BulkWriteUsersAsync(List<User> users) { await _connectionSemaphore.WaitAsync(); try { using (var connection = new SQLiteConnection(_connectionString)) { await connection.OpenAsync(); using (var transaction = connection.BeginTransaction()) { try { using (var command = new SQLiteCommand(connection)) { command.CommandText = @" INSERT INTO Users (Name, Age) VALUES (@Name, @Age)"; var nameParam = command.Parameters.Add("@Name", System.Data.DbType.String); var ageParam = command.Parameters.Add("@Age", System.Data.DbType.Int32); foreach (var user in users) { nameParam.Value = user.Name; ageParam.Value = user.Age; await command.ExecuteNonQueryAsync(); } } await transaction.CommitAsync(); } catch { await transaction.RollbackAsync(); throw; } } } } finally { _connectionSemaphore.Release(); } } // 异步更新数据的方法 public async Task<int> UpdateUserAsync(int id, User updatedUser) { await _connectionSemaphore.WaitAsync(); try { using (var connection = new SQLiteConnection(_connectionString)) { await connection.OpenAsync(); using (var command = new SQLiteCommand(connection)) { command.CommandText = @" UPDATE Users SET Name = @Name, Age = @Age WHERE Id = @Id"; command.Parameters.AddWithValue("@Name", updatedUser.Name); command.Parameters.AddWithValue("@Age", updatedUser.Age); command.Parameters.AddWithValue("@Id", id); return await command.ExecuteNonQueryAsync(); } } } finally { _connectionSemaphore.Release(); } } } }
namespace AppLiteSql { internal classProgram { static async Task Main(string[] args) { AdvancedDatabaseManager dbManager = new AdvancedDatabaseManager("D:\\myproject\\11Test\\AppLiteSql\\db"); // 写入单个用户 var newUser = new User { Name = "John Doe", Age = 30 }; int newUserId = await dbManager.WriteUserAsync(newUser); // 批量写入用户 var userList = new List<User> { new User { Name = "Alice", Age = 25 }, new User { Name = "Bob", Age = 35 } }; await dbManager.BulkWriteUsersAsync(userList); // 读取用户 var users = await dbManager.ReadUsersAsync("Age > 20"); foreach (var user in users) { Console.WriteLine($"User: {user.Name}, Age: {user.Age}"); } // 更新用户 var updatedUser = new User { Name = "John Smith", Age = 31 }; await dbManager.UpdateUserAsync(newUserId, updatedUser); } } }