forked from MarginaliaSearch/MarginaliaSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLoaderMain.java
More file actions
131 lines (107 loc) · 4.99 KB
/
LoaderMain.java
File metadata and controls
131 lines (107 loc) · 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package nu.marginalia.loading;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.loading.domains.DomainLoaderService;
import nu.marginalia.loading.links.DomainLinksLoaderService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
public class LoaderMain extends ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class);
private final ProcessHeartbeatImpl heartbeat;
private final FileStorageService fileStorageService;
private final DocumentDbWriter documentDbWriter;
private final DomainLoaderService domainService;
private final DomainLinksLoaderService linksService;
private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService;
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains")
|| (null == System.getProperty("loader.insertFoundDomains"));
public static void main(String... args) {
try {
new org.mariadb.jdbc.Driver();
Injector injector = Guice.createInjector(
new ProcessConfigurationModule("loader"),
new LoaderModule(),
new DatabaseModule(false)
);
var instance = injector.getInstance(LoaderMain.class);
var instructions = instance.fetchInstructions(LoadRequest.class);
logger.info("Instructions received");
instance.run(instructions);
}
catch (Throwable ex) {
logger.error("Error running loader", ex);
}
}
@Inject
public LoaderMain(ProcessHeartbeatImpl heartbeat,
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService,
DocumentDbWriter documentDbWriter,
DomainLoaderService domainService,
DomainLinksLoaderService linksService,
KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService,
ProcessConfiguration processConfiguration,
Gson gson
) {
super(messageQueueFactory, processConfiguration, gson, LOADER_INBOX);
this.heartbeat = heartbeat;
this.fileStorageService = fileStorageService;
this.documentDbWriter = documentDbWriter;
this.domainService = domainService;
this.linksService = linksService;
this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService;
heartbeat.start();
}
void run(Instructions<LoadRequest> instructions) throws Throwable {
List<Path> inputSources = new ArrayList<>();
for (var storageId : instructions.value().inputProcessDataStorageIds) {
inputSources.add(fileStorageService.getStorage(storageId).asPath());
}
var inputData = new LoaderInputData(inputSources);
DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(heartbeat, inputData);
boolean executionOk;
try (var pool = new ForkJoinPool(ForkJoinPool.getCommonPoolParallelism())) {
pool.submit(() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData));
pool.submit(() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData));
pool.submit(() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData));
if (insertFoundDomains) {
pool.submit(() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData));
}
executionOk = true;
}
catch (Exception ex) {
executionOk = false;
logger.error("Error", ex);
}
finally {
keywordLoaderService.close();
documentDbWriter.close();
heartbeat.shutDown();
}
if (executionOk) instructions.ok();
else instructions.err();
System.exit(0);
}
}