1+ using System ;
2+ using System . Data . SqlClient ;
3+ using System . Text ;
4+ using System . Threading ;
5+ using System . Threading . Tasks ;
6+ using MySql . Data . MySqlClient ;
7+ using Npgsql ;
8+ using Serilog ;
9+ using SqlStreamStore . Infrastructure ;
10+ using static SqlStreamStore . Server . Constants ;
11+
12+ namespace SqlStreamStore . Server
13+ {
14+ internal class DatabaseInitializer
15+ {
16+ private readonly SqlStreamStoreServerConfiguration _configuration ;
17+ private readonly SqlStreamStoreFactory _streamStoreFactory ;
18+
19+ public DatabaseInitializer ( SqlStreamStoreServerConfiguration configuration )
20+ {
21+ if ( configuration == null )
22+ {
23+ throw new ArgumentNullException ( nameof ( configuration ) ) ;
24+ }
25+
26+ _configuration = configuration ;
27+ _streamStoreFactory = new SqlStreamStoreFactory ( configuration ) ;
28+ }
29+
30+ public Task Initialize ( CancellationToken cancellationToken = default )
31+ {
32+ switch ( _configuration . Provider )
33+ {
34+ case mssql :
35+ return InitializeMsSqlStreamStore ( cancellationToken ) ;
36+ case mysql :
37+ return InitializeMySqlStreamStore ( cancellationToken ) ;
38+ case postgres :
39+ return InitializePostgresStreamStore ( cancellationToken ) ;
40+ default :
41+ Log . Warning ( "Provider {provider} has no initialization." , _configuration . Provider ) ;
42+ return Task . CompletedTask ;
43+ }
44+ }
45+
46+ private async Task InitializeMySqlStreamStore ( CancellationToken cancellationToken )
47+ {
48+ var connectionStringBuilder = new MySqlConnectionStringBuilder ( _configuration . ConnectionString ) ;
49+
50+ using ( var streamStore = _streamStoreFactory . CreateMySqlStreamStore ( ) )
51+ {
52+ try
53+ {
54+ using ( var connection = new MySqlConnection (
55+ new MySqlConnectionStringBuilder ( _configuration . ConnectionString )
56+ {
57+ Database = null
58+ } . ConnectionString ) )
59+ {
60+ await connection . OpenAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
61+
62+ using ( var command = new MySqlCommand (
63+ $ "CREATE DATABASE IF NOT EXISTS `{ connectionStringBuilder . Database } `",
64+ connection ) )
65+ {
66+ await command . ExecuteNonQueryAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
67+ }
68+ }
69+
70+ await streamStore . CreateSchemaIfNotExists ( cancellationToken ) ;
71+ }
72+ catch ( SqlException ex )
73+ {
74+ SchemaCreationFailed ( streamStore . GetSchemaCreationScript , ex ) ;
75+ throw ;
76+ }
77+ }
78+ }
79+
80+ private async Task InitializeMsSqlStreamStore ( CancellationToken cancellationToken )
81+ {
82+ var connectionStringBuilder = new SqlConnectionStringBuilder ( _configuration . ConnectionString ) ;
83+
84+ using ( var streamStore = _streamStoreFactory . CreateMsSqlStreamStore ( ) )
85+ {
86+ try
87+ {
88+ using ( var connection = new SqlConnection (
89+ new SqlConnectionStringBuilder ( _configuration . ConnectionString )
90+ {
91+ InitialCatalog = "master"
92+ } . ConnectionString ) )
93+ {
94+ await connection . OpenAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
95+
96+ using ( var command = new SqlCommand (
97+ $@ "
98+ IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = N'{ connectionStringBuilder . InitialCatalog } ')
99+ BEGIN
100+ CREATE DATABASE [{ connectionStringBuilder . InitialCatalog } ]
101+ END;
102+ " ,
103+ connection ) )
104+ {
105+ await command . ExecuteNonQueryAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
106+ }
107+ }
108+
109+ await streamStore . CreateSchemaIfNotExists ( cancellationToken ) ;
110+ }
111+ catch ( SqlException ex )
112+ {
113+ SchemaCreationFailed ( streamStore . GetSchemaCreationScript , ex ) ;
114+ throw ;
115+ }
116+ }
117+ }
118+
119+ private async Task InitializePostgresStreamStore ( CancellationToken cancellationToken )
120+ {
121+ var connectionStringBuilder = new NpgsqlConnectionStringBuilder ( _configuration . ConnectionString ) ;
122+ using ( var streamStore = _streamStoreFactory . CreatePostgresStreamStore ( ) )
123+ {
124+ try
125+ {
126+ using ( var connection = new NpgsqlConnection (
127+ new NpgsqlConnectionStringBuilder ( _configuration . ConnectionString )
128+ {
129+ Database = null
130+ } . ConnectionString ) )
131+ {
132+ await connection . OpenAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
133+
134+ async Task < bool > DatabaseExists ( )
135+ {
136+ using ( var command = new NpgsqlCommand (
137+ $ "SELECT 1 FROM pg_database WHERE datname = '{ connectionStringBuilder . Database } '",
138+ connection ) )
139+ {
140+ return await command . ExecuteScalarAsync ( cancellationToken ) . NotOnCapturedContext ( )
141+ != null ;
142+ }
143+ }
144+
145+ if ( ! await DatabaseExists ( ) )
146+ {
147+ using ( var command = new NpgsqlCommand (
148+ $ "CREATE DATABASE { connectionStringBuilder . Database } ",
149+ connection ) )
150+ {
151+ await command . ExecuteNonQueryAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
152+ }
153+ }
154+
155+ await streamStore . CreateSchemaIfNotExists ( cancellationToken ) ;
156+ }
157+ }
158+ catch ( NpgsqlException ex )
159+ {
160+ SchemaCreationFailed ( streamStore . GetSchemaCreationScript , ex ) ;
161+ throw ;
162+ }
163+ }
164+ }
165+
166+ private static void SchemaCreationFailed ( Func < string > getSchemaCreationScript , Exception ex )
167+ => Log . Error (
168+ new StringBuilder ( )
169+ . Append ( "Could not create schema: {ex}" )
170+ . AppendLine ( )
171+ . Append (
172+ "Does your connection string have enough permissions? If not, run the following sql script as a privileged user:" )
173+ . AppendLine ( )
174+ . Append ( "{script}" )
175+ . ToString ( ) ,
176+ ex ,
177+ getSchemaCreationScript ( ) ) ;
178+ }
179+ }
0 commit comments