Skip to content
This repository was archived by the owner on Sep 3, 2024. It is now read-only.

Commit abf9e71

Browse files
further separating database initialization
1 parent 86cb14f commit abf9e71

File tree

3 files changed

+165
-113
lines changed

3 files changed

+165
-113
lines changed
Lines changed: 51 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
using System;
22
using System.Data.SqlClient;
3-
using System.Text;
43
using System.Threading;
54
using System.Threading.Tasks;
65
using MySql.Data.MySqlClient;
76
using Npgsql;
87
using Serilog;
98
using SqlStreamStore.Infrastructure;
10-
using static SqlStreamStore.Server.Constants;
119

1210
namespace SqlStreamStore.Server
1311
{
1412
internal class DatabaseInitializer
1513
{
1614
private readonly SqlStreamStoreServerConfiguration _configuration;
17-
private readonly SqlStreamStoreFactory _streamStoreFactory;
1815

1916
public DatabaseInitializer(SqlStreamStoreServerConfiguration configuration)
2017
{
@@ -24,156 +21,101 @@ public DatabaseInitializer(SqlStreamStoreServerConfiguration configuration)
2421
}
2522

2623
_configuration = configuration;
27-
_streamStoreFactory = new SqlStreamStoreFactory(configuration);
2824
}
2925

3026
public Task Initialize(CancellationToken cancellationToken = default)
3127
{
3228
switch (_configuration.Provider)
3329
{
34-
case mssql:
35-
return InitializeMsSqlStreamStore(cancellationToken);
36-
case mysql:
37-
return InitializeMySqlStreamStore(cancellationToken);
38-
case postgres:
39-
return InitializePostgresStreamStore(cancellationToken);
30+
case Constants.mssql:
31+
return InitializeMsSql(cancellationToken);
32+
case Constants.mysql:
33+
return InitializeMySql(cancellationToken);
34+
case Constants.postgres:
35+
return InitializePostgres(cancellationToken);
4036
default:
41-
Log.Warning("Provider {provider} has no initialization.", _configuration.Provider);
37+
Log.Warning("Provider {provider} has no database initializer.", _configuration.Provider);
4238
return Task.CompletedTask;
4339
}
4440
}
4541

46-
private async Task InitializeMySqlStreamStore(CancellationToken cancellationToken)
42+
private async Task InitializeMySql(CancellationToken cancellationToken)
4743
{
4844
var connectionStringBuilder = new MySqlConnectionStringBuilder(_configuration.ConnectionString);
49-
50-
using (var streamStore = _streamStoreFactory.CreateMySqlStreamStore())
51-
{
52-
try
45+
using (var connection = new MySqlConnection(
46+
new MySqlConnectionStringBuilder(_configuration.ConnectionString)
5347
{
54-
using (var connection = new MySqlConnection(
55-
new MySqlConnectionStringBuilder(_configuration.ConnectionString)
56-
{
57-
Database = null
58-
}.ConnectionString))
59-
{
60-
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
61-
62-
using (var command = new MySqlCommand(
63-
$"CREATE DATABASE IF NOT EXISTS `{connectionStringBuilder.Database}`",
64-
connection))
65-
{
66-
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
67-
}
68-
}
48+
Database = null
49+
}.ConnectionString))
50+
{
51+
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
6952

70-
await streamStore.CreateSchemaIfNotExists(cancellationToken);
71-
}
72-
catch (SqlException ex)
53+
using (var command = new MySqlCommand(
54+
$"CREATE DATABASE IF NOT EXISTS `{connectionStringBuilder.Database}`",
55+
connection))
7356
{
74-
SchemaCreationFailed(streamStore.GetSchemaCreationScript, ex);
75-
throw;
57+
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
7658
}
7759
}
7860
}
7961

80-
private async Task InitializeMsSqlStreamStore(CancellationToken cancellationToken)
62+
private async Task InitializeMsSql(CancellationToken cancellationToken)
8163
{
8264
var connectionStringBuilder = new SqlConnectionStringBuilder(_configuration.ConnectionString);
83-
84-
using (var streamStore = _streamStoreFactory.CreateMsSqlStreamStore())
85-
{
86-
try
65+
using (var connection = new SqlConnection(
66+
new SqlConnectionStringBuilder(_configuration.ConnectionString)
8767
{
88-
using (var connection = new SqlConnection(
89-
new SqlConnectionStringBuilder(_configuration.ConnectionString)
90-
{
91-
InitialCatalog = "master"
92-
}.ConnectionString))
93-
{
94-
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
68+
InitialCatalog = "master"
69+
}.ConnectionString))
70+
{
71+
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
9572

96-
using (var command = new SqlCommand(
97-
$@"
73+
using (var command = new SqlCommand(
74+
$@"
9875
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = N'{connectionStringBuilder.InitialCatalog}')
9976
BEGIN
10077
CREATE DATABASE [{connectionStringBuilder.InitialCatalog}]
10178
END;
10279
",
103-
connection))
104-
{
105-
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
106-
}
107-
}
108-
109-
await streamStore.CreateSchemaIfNotExists(cancellationToken);
110-
}
111-
catch (SqlException ex)
80+
connection))
11281
{
113-
SchemaCreationFailed(streamStore.GetSchemaCreationScript, ex);
114-
throw;
82+
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
11583
}
11684
}
11785
}
11886

119-
private async Task InitializePostgresStreamStore(CancellationToken cancellationToken)
87+
private async Task InitializePostgres(CancellationToken cancellationToken)
12088
{
12189
var connectionStringBuilder = new NpgsqlConnectionStringBuilder(_configuration.ConnectionString);
122-
using (var streamStore = _streamStoreFactory.CreatePostgresStreamStore())
90+
using (var connection = new NpgsqlConnection(
91+
new NpgsqlConnectionStringBuilder(_configuration.ConnectionString)
92+
{
93+
Database = null
94+
}.ConnectionString))
12395
{
124-
try
96+
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
97+
98+
async Task<bool> DatabaseExists()
12599
{
126-
using (var connection = new NpgsqlConnection(
127-
new NpgsqlConnectionStringBuilder(_configuration.ConnectionString)
128-
{
129-
Database = null
130-
}.ConnectionString))
100+
using (var command = new NpgsqlCommand(
101+
$"SELECT 1 FROM pg_database WHERE datname = '{connectionStringBuilder.Database}'",
102+
connection))
131103
{
132-
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
133-
134-
async Task<bool> DatabaseExists()
135-
{
136-
using (var command = new NpgsqlCommand(
137-
$"SELECT 1 FROM pg_database WHERE datname = '{connectionStringBuilder.Database}'",
138-
connection))
139-
{
140-
return await command.ExecuteScalarAsync(cancellationToken).NotOnCapturedContext()
141-
!= null;
142-
}
143-
}
144-
145-
if (!await DatabaseExists())
146-
{
147-
using (var command = new NpgsqlCommand(
148-
$"CREATE DATABASE {connectionStringBuilder.Database}",
149-
connection))
150-
{
151-
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
152-
}
153-
}
154-
155-
await streamStore.CreateSchemaIfNotExists(cancellationToken);
104+
return await command.ExecuteScalarAsync(cancellationToken).NotOnCapturedContext()
105+
!= null;
156106
}
157107
}
158-
catch (NpgsqlException ex)
108+
109+
if (!await DatabaseExists())
159110
{
160-
SchemaCreationFailed(streamStore.GetSchemaCreationScript, ex);
161-
throw;
111+
using (var command = new NpgsqlCommand(
112+
$"CREATE DATABASE {connectionStringBuilder.Database}",
113+
connection))
114+
{
115+
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
116+
}
162117
}
163118
}
164119
}
165-
166-
private static void SchemaCreationFailed(Func<string> getSchemaCreationScript, Exception ex)
167-
=> Log.Error(
168-
new StringBuilder()
169-
.Append("Could not create schema: {ex}")
170-
.AppendLine()
171-
.Append(
172-
"Does your connection string have enough permissions? If not, run the following sql script as a privileged user:")
173-
.AppendLine()
174-
.Append("{script}")
175-
.ToString(),
176-
ex,
177-
getSchemaCreationScript());
178120
}
179121
}

src/SqlStreamStore.Server/Program.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,14 @@ private async Task<int> Run()
5050
case "initialize":
5151
case "init":
5252
await RunInitialization();
53-
break;
53+
return 0;
54+
case "initialize-database":
55+
case "init-database":
56+
await RunDatabaseInitialization();
57+
return 0;
5458
default:
5559
await RunServer();
56-
break;
60+
return 0;
5761
}
5862
}
5963
catch (Exception ex)
@@ -65,8 +69,6 @@ private async Task<int> Run()
6569
{
6670
Log.CloseAndFlush();
6771
}
68-
69-
return 0;
7072
}
7173

7274
private async Task RunServer()
@@ -92,6 +94,9 @@ await Task.WhenAll(
9294
}
9395

9496
private Task RunInitialization()
97+
=> new SqlStreamStoreInitializer(_configuration).Initialize(_cts.Token);
98+
99+
private Task RunDatabaseInitialization()
95100
=> new DatabaseInitializer(_configuration).Initialize(_cts.Token);
96101

97102
public void Dispose()
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
using System;
2+
using System.Data.SqlClient;
3+
using System.Text;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Npgsql;
7+
using Serilog;
8+
using static SqlStreamStore.Server.Constants;
9+
10+
namespace SqlStreamStore.Server
11+
{
12+
internal class SqlStreamStoreInitializer
13+
{
14+
private readonly SqlStreamStoreServerConfiguration _configuration;
15+
private readonly SqlStreamStoreFactory _streamStoreFactory;
16+
17+
public SqlStreamStoreInitializer(SqlStreamStoreServerConfiguration configuration)
18+
{
19+
if (configuration == null)
20+
{
21+
throw new ArgumentNullException(nameof(configuration));
22+
}
23+
24+
_configuration = configuration;
25+
_streamStoreFactory = new SqlStreamStoreFactory(configuration);
26+
}
27+
28+
public Task Initialize(CancellationToken cancellationToken = default)
29+
{
30+
switch (_configuration.Provider)
31+
{
32+
case mssql:
33+
return InitializeMsSqlStreamStore(cancellationToken);
34+
case mysql:
35+
return InitializeMySqlStreamStore(cancellationToken);
36+
case postgres:
37+
return InitializePostgresStreamStore(cancellationToken);
38+
default:
39+
Log.Warning("Provider {provider} has no initialization.", _configuration.Provider);
40+
return Task.CompletedTask;
41+
}
42+
}
43+
44+
private async Task InitializeMySqlStreamStore(CancellationToken cancellationToken)
45+
{
46+
using (var streamStore = _streamStoreFactory.CreateMySqlStreamStore())
47+
{
48+
try
49+
{
50+
await streamStore.CreateSchemaIfNotExists(cancellationToken);
51+
}
52+
catch (SqlException ex)
53+
{
54+
SchemaCreationFailed(streamStore.GetSchemaCreationScript, ex);
55+
throw;
56+
}
57+
}
58+
}
59+
60+
private async Task InitializeMsSqlStreamStore(CancellationToken cancellationToken)
61+
{
62+
using (var streamStore = _streamStoreFactory.CreateMsSqlStreamStore())
63+
{
64+
try
65+
{
66+
await streamStore.CreateSchemaIfNotExists(cancellationToken);
67+
}
68+
catch (SqlException ex)
69+
{
70+
SchemaCreationFailed(streamStore.GetSchemaCreationScript, ex);
71+
throw;
72+
}
73+
}
74+
}
75+
76+
private async Task InitializePostgresStreamStore(CancellationToken cancellationToken)
77+
{
78+
using (var streamStore = _streamStoreFactory.CreatePostgresStreamStore())
79+
{
80+
try
81+
{
82+
await streamStore.CreateSchemaIfNotExists(cancellationToken);
83+
}
84+
catch (NpgsqlException ex)
85+
{
86+
SchemaCreationFailed(streamStore.GetSchemaCreationScript, ex);
87+
throw;
88+
}
89+
}
90+
}
91+
92+
private static void SchemaCreationFailed(Func<string> getSchemaCreationScript, Exception ex)
93+
=> Log.Error(
94+
new StringBuilder()
95+
.Append("Could not create schema: {ex}")
96+
.AppendLine()
97+
.Append(
98+
"Does your connection string have enough permissions? If not, run the following sql script as a privileged user:")
99+
.AppendLine()
100+
.Append("{script}")
101+
.ToString(),
102+
ex,
103+
getSchemaCreationScript());
104+
}
105+
}

0 commit comments

Comments
 (0)