removed threads from inside of matrixupdate, made executor be instance based and not shut down

This commit is contained in:
jenzur 2019-03-05 17:08:13 +01:00
parent 7f154f3456
commit 1fe4a268c4
2 changed files with 182 additions and 190 deletions

View File

@ -25,7 +25,6 @@ import edu.stanford.nlp.trees.Tree;
import edu.stanford.nlp.trees.TreebankLanguagePack; import edu.stanford.nlp.trees.TreebankLanguagePack;
import java.io.IOException; import java.io.IOException;
import java.io.StringReader; import java.io.StringReader;
import java.lang.management.ManagementFactory;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -40,7 +39,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -59,7 +57,6 @@ public class MYSQLDatahandler {
public static int semeticsUpdateCount; public static int semeticsUpdateCount;
private static int counter = 0; private static int counter = 0;
private volatile boolean refreshMatrixFromDB; private volatile boolean refreshMatrixFromDB;
private volatile boolean permitted = false;
private final ConcurrentMap<Integer, String> stringCache; private final ConcurrentMap<Integer, String> stringCache;
private LinkedHashMap<String, LinkedHashMap<String, Double>> lHMSMX = new LinkedHashMap(); private LinkedHashMap<String, LinkedHashMap<String, Double>> lHMSMX = new LinkedHashMap();
private ConcurrentMap<Integer, String> multiprocessCalculations; private ConcurrentMap<Integer, String> multiprocessCalculations;
@ -80,6 +77,7 @@ public class MYSQLDatahandler {
private static LexicalizedParser lp; private static LexicalizedParser lp;
private static TreebankLanguagePack tlp; private static TreebankLanguagePack tlp;
private static AbstractSequenceClassifier<CoreLabel> classifier; private static AbstractSequenceClassifier<CoreLabel> classifier;
private ExecutorService executor;
private static StanfordCoreNLP pipeline; private static StanfordCoreNLP pipeline;
private static StanfordCoreNLP pipelineSentiment; private static StanfordCoreNLP pipelineSentiment;
@ -91,6 +89,12 @@ public class MYSQLDatahandler {
MYSQLDatahandler.classifier = classifier; MYSQLDatahandler.classifier = classifier;
} }
public void instantiateExecutor() {
this.executor = new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public MYSQLDatahandler() { public MYSQLDatahandler() {
this.stopwatch = Stopwatch.createUnstarted(); this.stopwatch = Stopwatch.createUnstarted();
this.stopwatch1 = Stopwatch.createStarted(); this.stopwatch1 = Stopwatch.createStarted();
@ -161,6 +165,14 @@ public class MYSQLDatahandler {
return LHM; return LHM;
} }
public int getlHMSMXSize() {
return lHMSMX.size();
}
public int getstringCacheSize() {
return stringCache.size();
}
public void initiateMYSQL() throws SQLException, IOException { public void initiateMYSQL() throws SQLException, IOException {
try { try {
DataMapper.createTables(); DataMapper.createTables();
@ -174,187 +186,171 @@ public class MYSQLDatahandler {
public synchronized void checkIfUpdateMatrixes() { public synchronized void checkIfUpdateMatrixes() {
refreshMatrixFromDB = false; refreshMatrixFromDB = false;
int calculationBoundaries = 10; int calculationBoundaries = 5;
int updateBadgesInteger = 250; int updateBadgesInteger = 250;
while (lHMSMX.size() < (stringCache.values().size() * stringCache.values().size()) - stringCache.values().size()) { if (stopwatch1.elapsed(TimeUnit.SECONDS) >= EXPIRE_TIME_IN_SECONDS1) {
if (stopwatch1.elapsed(TimeUnit.SECONDS) >= EXPIRE_TIME_IN_SECONDS1) { refreshMatrixFromDB = true;
refreshMatrixFromDB = true; lHMSMX = DataMapper.getAllRelationScores();
lHMSMX = DataMapper.getAllRelationScores(); stopwatch1.reset();
stopwatch1.reset(); }
} //requiring atleast 10 entries ensures no issues in case of empty stringcache
//requiring atleast 10 entries ensures no issues in case of empty stringcache if (stringCache.values().size() > 10 && !refreshMatrixFromDB) {
if (stringCache.values().size() > 10 && !refreshMatrixFromDB) { if (counter <= 5) {
if (counter <= 5) { counter++;
counter++; ConcurrentMap<Integer, String> stringCachelocal = stringCache;
ConcurrentMap<Integer, String> stringCachelocal = stringCache; List<Integer> updateLocal = updatedRows;
List<Integer> updateLocal = updatedRows; int random = -1;
int random = -1; if (!updateLocal.contains(random)) {
if (!updateLocal.contains(random)) {
updatedRows.add(random);
}
Collections.sort(updateLocal);
while (updateLocal.contains(random)) {
random = new Random().nextInt(stringCachelocal.values().size() - 6);
int indexPrev = Collections.binarySearch(updateLocal, random);
int indexNext = Collections.binarySearch(updateLocal, random + 6);
//-1 will always be index 0
if (indexPrev > 0 && indexNext > 0) {
indexPrev = updateLocal.get(indexPrev);
indexNext = updateLocal.get(indexNext);
}
random = indexPrev < random - 5 && indexNext < random ? random : -1;
}
updatedRows.add(random); updatedRows.add(random);
semeticsUpdateCount = random;
int beginindex = semeticsUpdateCount;
semeticsUpdateCount += calculationBoundaries;
int temp = semeticsUpdateCount;
System.out.println("beginindex: " + beginindex + "\ntemp: " + temp + "\n");
ConcurrentMap<Integer, String> strIndexNavigator = new MapMaker().concurrencyLevel(2).makeMap();
int ij = 0;
while (beginindex + ij < temp) {
String get = stringCachelocal.get(beginindex + ij);
strIndexNavigator.put(ij, get);
multiprocessCalculations.put(multiprocessCalculations.size() + 1, get);
ij++;
}
LinkedHashMap<String, LinkedHashMap<String, Double>> LHMSMXLocal = lHMSMX;
ConcurrentMap<Integer, String> strIndexAll = stringCachelocal;
ConcurrentMap<Integer, String> strIndexNavigatorL = strIndexNavigator;
ConcurrentMap<Integer, String> randomIndexesToUpdate = new MapMaker().concurrencyLevel(2).makeMap();
int indexes = updateBadgesInteger;
if (indexes >= strIndexAll.size()) {
indexes = strIndexAll.size() - 1;
}
int beginindexes = new Random().nextInt((strIndexAll.size()) - indexes);
int ij1 = 0;
while (beginindexes + ij1 < beginindexes + indexes) {
String get1 = strIndexAll.get(beginindexes + ij1);
randomIndexesToUpdate.put(ij1, get1);
ij1++;
}
ConcurrentMap<Integer, SimilarityMatrix> matrixUpdateList = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<Integer, Future<SimilarityMatrix>> futures = new MapMaker().concurrencyLevel(2).makeMap();
System.out.println("activecount: " + java.lang.Thread.activeCount() + "\nThreadCount: " + ManagementFactory.getThreadMXBean().getThreadCount()
+ "\navailableprocessors: " + Runtime.getRuntime().availableProcessors() + "\n");
ExecutorService executor = new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
strIndexNavigatorL.values().forEach((str) -> {
randomIndexesToUpdate.values().stream().filter((str1) -> (!str.equals(str1))).forEachOrdered((str1) -> {
boolean present = false;
if (multiprocessCalculations.values().contains(str1)) {
present = true;
} else if (LHMSMXLocal.containsKey(str)) {
LinkedHashMap<String, Double> orDefault = LHMSMXLocal.get(str);
if (orDefault.containsKey(str1)) {
present = true;
}
} else if (LHMSMXLocal.containsKey(str1)) {
LinkedHashMap<String, Double> orDefault = LHMSMXLocal.get(str1);
if (orDefault.containsKey(str)) {
present = true;
}
}
if (!present) {
SimilarityMatrix SMX = new SimilarityMatrix(str, str1);
Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(str, str1, SMX);
futures.put(futures.size() + 1, executor.submit(worker));
}
});
});
executor.shutdown();
try {
System.out.println("finished worker assignment, futures size: " + futures.size() + "\n");
for (Future<SimilarityMatrix> future : futures.values()) {
SimilarityMatrix SMX = future.get();
LinkedHashMap<String, Double> get = lHMSMX.getOrDefault(SMX.getPrimaryString(), null);
if (get == null) {
get = new LinkedHashMap();
}
get.put(SMX.getSecondaryString(), SMX.getDistance());
lHMSMX.put(SMX.getPrimaryString(), get);
matrixUpdateList.put(matrixUpdateList.size() + 1, SMX);
}
} catch (InterruptedException | ExecutionException ex) {
Logger.getLogger(MYSQLDatahandler.class.getName()).log(Level.SEVERE, null, ex);
}
new Thread(() -> {
try {
if (!matrixUpdateList.isEmpty()) {
DataMapper.insertSementicMatrixes(matrixUpdateList);
System.out.println("finished datamapper semetic insert");
}
} catch (CustomError ex) {
Logger.getLogger(MYSQLDatahandler.class
.getName()).log(Level.SEVERE, null, ex);
}
}).start();
} else if (!permitted) {
permitted = true;
new Thread(() -> {
ConcurrentMap<Integer, String> stringCachelocal = stringCache;
ConcurrentMap<Integer, SimilarityMatrix> matrixUpdateList = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<Integer, Future<SimilarityMatrix>> futures = new MapMaker().concurrencyLevel(2).makeMap();
ExecutorService executor = new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
Collection<String> values = multiprocessCalculations.values();
LinkedHashMap<String, LinkedHashMap<String, Double>> LHMSMXLocal = lHMSMX;
values.forEach((str) -> {
stringCachelocal.values().stream().filter((str1) -> (!str.equals(str1))).forEachOrdered((str1) -> {
boolean present = false;
if (LHMSMXLocal.containsKey(str)) {
LinkedHashMap<String, Double> orDefault = LHMSMXLocal.get(str);
if (orDefault.containsKey(str1)) {
present = true;
}
} else if (LHMSMXLocal.containsKey(str1)) {
LinkedHashMap<String, Double> orDefault = LHMSMXLocal.get(str1);
if (orDefault.containsKey(str)) {
present = true;
}
}
if (!present) {
SimilarityMatrix SMX = new SimilarityMatrix(str, str1);
Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(str, str1, SMX);
futures.put(futures.size() + 1, executor.submit(worker));
}
});
});
executor.shutdown();
try {
System.out.println("finished worker assignment, futures size: " + futures.size() + "\n");
for (Future<SimilarityMatrix> future : futures.values()) {
SimilarityMatrix SMX = future.get();
LinkedHashMap<String, Double> get = lHMSMX.getOrDefault(SMX.getPrimaryString(), null);
if (get == null) {
get = new LinkedHashMap();
}
get.put(SMX.getSecondaryString(), SMX.getDistance());
lHMSMX.put(SMX.getPrimaryString(), get);
matrixUpdateList.put(matrixUpdateList.size() + 1, SMX);
}
} catch (InterruptedException | ExecutionException ex) {
Logger.getLogger(MYSQLDatahandler.class.getName()).log(Level.SEVERE, null, ex);
}
new Thread(() -> {
try {
multiprocessCalculations = new MapMaker().concurrencyLevel(2).makeMap();
updatedRows = new ArrayList();
counter = 0;
permitted = false;
if (!matrixUpdateList.isEmpty()) {
DataMapper.insertSementicMatrixes(matrixUpdateList);
System.out.println("finished datamapper semetic insert");
}
} catch (CustomError ex) {
Logger.getLogger(MYSQLDatahandler.class
.getName()).log(Level.SEVERE, null, ex);
}
}).start();
}).start();
} }
Collections.sort(updateLocal);
while (updateLocal.contains(random)) {
random = new Random().nextInt(stringCachelocal.values().size() - 6);
int indexPrev = Collections.binarySearch(updateLocal, random);
int indexNext = Collections.binarySearch(updateLocal, random + 6);
//-1 will always be index 0
if (indexPrev > 0 && indexNext > 0) {
indexPrev = updateLocal.get(indexPrev);
indexNext = updateLocal.get(indexNext);
}
random = indexPrev < random - 5 && indexNext < random ? random : -1;
}
updatedRows.add(random);
semeticsUpdateCount = random;
int beginindex = semeticsUpdateCount;
semeticsUpdateCount += calculationBoundaries;
int temp = semeticsUpdateCount;
System.out.println("beginindex: " + beginindex + "\ntemp: " + temp + "\n");
ConcurrentMap<Integer, String> strIndexNavigator = new MapMaker().concurrencyLevel(2).makeMap();
int ij = 0;
while (beginindex + ij < temp) {
String get = stringCachelocal.get(beginindex + ij);
strIndexNavigator.put(ij, get);
multiprocessCalculations.put(multiprocessCalculations.size() + 1, get);
ij++;
}
LinkedHashMap<String, LinkedHashMap<String, Double>> LHMSMXLocal = lHMSMX;
ConcurrentMap<Integer, String> strIndexAll = stringCachelocal;
ConcurrentMap<Integer, String> strIndexNavigatorL = strIndexNavigator;
ConcurrentMap<Integer, String> randomIndexesToUpdate = new MapMaker().concurrencyLevel(2).makeMap();
int indexes = updateBadgesInteger;
if (indexes >= strIndexAll.size()) {
indexes = strIndexAll.size() - 1;
}
int beginindexes = new Random().nextInt((strIndexAll.size()) - indexes);
int ij1 = 0;
while (beginindexes + ij1 < beginindexes + indexes) {
String get1 = strIndexAll.get(beginindexes + ij1);
randomIndexesToUpdate.put(ij1, get1);
ij1++;
}
ConcurrentMap<Integer, SimilarityMatrix> matrixUpdateList = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<Integer, Future<SimilarityMatrix>> futures = new MapMaker().concurrencyLevel(2).makeMap();
strIndexNavigatorL.values().forEach((str) -> {
randomIndexesToUpdate.values().stream().filter((str1) -> (!str.equals(str1))).forEachOrdered((str1) -> {
boolean present = false;
if (multiprocessCalculations.values().contains(str1)) {
present = true;
} else if (LHMSMXLocal.containsKey(str)) {
LinkedHashMap<String, Double> orDefault = LHMSMXLocal.get(str);
if (orDefault.containsKey(str1)) {
present = true;
}
} else if (LHMSMXLocal.containsKey(str1)) {
LinkedHashMap<String, Double> orDefault = LHMSMXLocal.get(str1);
if (orDefault.containsKey(str)) {
present = true;
}
}
if (!present) {
SimilarityMatrix SMX = new SimilarityMatrix(str, str1);
Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(str, str1, SMX);
futures.put(futures.size() + 1, executor.submit(worker));
}
});
});
try {
System.out.println("finished worker assignment, futures size: " + futures.size() + "\n");
for (Future<SimilarityMatrix> future : futures.values()) {
SimilarityMatrix SMX = future.get();
LinkedHashMap<String, Double> get = lHMSMX.getOrDefault(SMX.getPrimaryString(), null);
if (get == null) {
get = new LinkedHashMap();
}
get.put(SMX.getSecondaryString(), SMX.getDistance());
lHMSMX.put(SMX.getPrimaryString(), get);
matrixUpdateList.put(matrixUpdateList.size() + 1, SMX);
}
} catch (InterruptedException | ExecutionException ex) {
Logger.getLogger(MYSQLDatahandler.class.getName()).log(Level.SEVERE, null, ex);
}
new Thread(() -> {
try {
if (!matrixUpdateList.isEmpty()) {
DataMapper.insertSementicMatrixes(matrixUpdateList);
System.out.println("finished datamapper semetic insert");
}
} catch (CustomError ex) {
Logger.getLogger(MYSQLDatahandler.class
.getName()).log(Level.SEVERE, null, ex);
}
}).start();
} else {
ConcurrentMap<Integer, String> stringCachelocal = stringCache;
ConcurrentMap<Integer, SimilarityMatrix> matrixUpdateList = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<Integer, Future<SimilarityMatrix>> futures = new MapMaker().concurrencyLevel(2).makeMap();
Collection<String> values = multiprocessCalculations.values();
LinkedHashMap<String, LinkedHashMap<String, Double>> LHMSMXLocal = lHMSMX;
values.forEach((str) -> {
stringCachelocal.values().stream().filter((str1) -> (!str.equals(str1))).forEachOrdered((str1) -> {
boolean present = false;
if (LHMSMXLocal.containsKey(str)) {
LinkedHashMap<String, Double> orDefault = LHMSMXLocal.get(str);
if (orDefault.containsKey(str1)) {
present = true;
}
} else if (LHMSMXLocal.containsKey(str1)) {
LinkedHashMap<String, Double> orDefault = LHMSMXLocal.get(str1);
if (orDefault.containsKey(str)) {
present = true;
}
}
if (!present) {
SimilarityMatrix SMX = new SimilarityMatrix(str, str1);
Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(str, str1, SMX);
futures.put(futures.size() + 1, executor.submit(worker));
}
});
});
try {
System.out.println("finished worker assignment, futures size: " + futures.size() + "\n");
for (Future<SimilarityMatrix> future : futures.values()) {
SimilarityMatrix SMX = future.get();
LinkedHashMap<String, Double> get = lHMSMX.getOrDefault(SMX.getPrimaryString(), null);
if (get == null) {
get = new LinkedHashMap();
}
get.put(SMX.getSecondaryString(), SMX.getDistance());
lHMSMX.put(SMX.getPrimaryString(), get);
matrixUpdateList.put(matrixUpdateList.size() + 1, SMX);
}
} catch (InterruptedException | ExecutionException ex) {
Logger.getLogger(MYSQLDatahandler.class.getName()).log(Level.SEVERE, null, ex);
}
multiprocessCalculations = new MapMaker().concurrencyLevel(2).makeMap();
updatedRows = new ArrayList();
counter = 0;
new Thread(() -> {
try {
if (!matrixUpdateList.isEmpty()) {
DataMapper.insertSementicMatrixes(matrixUpdateList);
System.out.println("finished datamapper semetic insert");
}
} catch (CustomError ex) {
Logger.getLogger(MYSQLDatahandler.class
.getName()).log(Level.SEVERE, null, ex);
}
}).start();
} }
} }
} }
@ -434,16 +430,12 @@ public class MYSQLDatahandler {
System.out.println("none within 8 range"); System.out.println("none within 8 range");
ConcurrentMap<Integer, String> strCache = stringCache; ConcurrentMap<Integer, String> strCache = stringCache;
ConcurrentMap<Integer, Future<SimilarityMatrix>> futureslocal = new MapMaker().concurrencyLevel(2).makeMap(); ConcurrentMap<Integer, Future<SimilarityMatrix>> futureslocal = new MapMaker().concurrencyLevel(2).makeMap();
ExecutorService executor = new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
for (String str1 : strCache.values()) { for (String str1 : strCache.values()) {
if (!str.equals(str1)) { if (!str.equals(str1)) {
Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(str, str1, new SimilarityMatrix(str, str1)); Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(str, str1, new SimilarityMatrix(str, str1));
futureslocal.put(futureslocal.size() + 1, executor.submit(worker)); futureslocal.put(futureslocal.size() + 1, executor.submit(worker));
} }
} }
executor.shutdown();
int index = 0; int index = 0;
for (Future<SimilarityMatrix> future : futureslocal.values()) { for (Future<SimilarityMatrix> future : futureslocal.values()) {
try { try {
@ -469,13 +461,9 @@ public class MYSQLDatahandler {
int minDistance = 8; int minDistance = 8;
String similar = ""; String similar = "";
ConcurrentMap<Integer, Future<DistanceObject>> futures = new MapMaker().concurrencyLevel(2).makeMap(); ConcurrentMap<Integer, Future<DistanceObject>> futures = new MapMaker().concurrencyLevel(2).makeMap();
ExecutorService executor = new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
concurrentStrings.values().stream().map((str) -> new LevenshteinDistance(str, toBeCompared, new DistanceObject())).forEachOrdered((worker) -> { concurrentStrings.values().stream().map((str) -> new LevenshteinDistance(str, toBeCompared, new DistanceObject())).forEachOrdered((worker) -> {
futures.put(futures.size() + 1, executor.submit(worker)); futures.put(futures.size() + 1, executor.submit(worker));
}); });
executor.shutdown();
try { try {
for (Future<DistanceObject> future : futures.values()) { for (Future<DistanceObject> future : futures.values()) {
DistanceObject d = future.get(); DistanceObject d = future.get();

View File

@ -44,7 +44,11 @@ public class DiscordHandler {
} }
}).start(); }).start();
MYSQLDatahandler.shiftReduceParserInitiate(); MYSQLDatahandler.shiftReduceParserInitiate();
MYSQLDatahandler.instance.checkIfUpdateMatrixes(); MYSQLDatahandler.instance.instantiateExecutor();
while (MYSQLDatahandler.instance.getlHMSMXSize() < (MYSQLDatahandler.instance.getstringCacheSize()
* MYSQLDatahandler.instance.getstringCacheSize()) - MYSQLDatahandler.instance.getstringCacheSize()) {
MYSQLDatahandler.instance.checkIfUpdateMatrixes();
}
String token = "NTI5NzAxNTk5NjAyMjc4NDAx.Dw0vDg.7-aMjVWdQMYPl8qVNyvTCPS5F_A"; String token = "NTI5NzAxNTk5NjAyMjc4NDAx.Dw0vDg.7-aMjVWdQMYPl8qVNyvTCPS5F_A";
DiscordApi api = new DiscordApiBuilder().setToken(token).login().join(); DiscordApi api = new DiscordApiBuilder().setToken(token).login().join();
api.addMessageCreateListener(event -> { api.addMessageCreateListener(event -> {