pipeline propperty updates, untokenizeable removed, updating the matrixupdater, coreNLP internal thread devider is very effective for mass annotation compared to simple streaming

This commit is contained in:
jenzur 2019-03-26 21:38:03 +01:00
parent 17ef94ef07
commit 511eb0e492
3 changed files with 110 additions and 123 deletions

View File

@ -29,6 +29,7 @@ import java.io.StringReader;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -56,7 +57,7 @@ public class Datahandler {
public static final long EXPIRE_TIME_IN_SECONDS1 = TimeUnit.SECONDS.convert(10, TimeUnit.HOURS);
public static Datahandler instance = new Datahandler();
private volatile boolean refreshMatrixFromDB;
private static int secondaryIterator = 0;
private static volatile int secondaryIterator = 0;
private final ConcurrentMap<Integer, String> stringCache;
private static ConcurrentMap<String, Annotation> pipelineAnnotationCache;
private static ConcurrentMap<String, Annotation> pipelineSentimentAnnotationCache;
@ -72,7 +73,7 @@ public class Datahandler {
private static String nerModel = "edu/stanford/nlp/models/ner/english.all.3class.distsim.crf.ser.gz";
private static MaxentTagger tagger;
private static ShiftReduceParser model;
private static String[] options = {"-maxLength", "100"};
private static String[] options = {"-maxLength", "90"};
private static Properties props = new Properties();
private static Properties propsSentiment = new Properties();
private static GrammaticalStructureFactory gsf;
@ -112,8 +113,12 @@ public class Datahandler {
propsSentiment.setProperty("parse.model", lexParserEnglishRNN);
propsSentiment.setProperty("ner.model", nerModel);
propsSentiment.setProperty("sentiment.model", sentimentModel);
propsSentiment.setProperty("parse.maxlen", "100");
propsSentiment.setProperty("annotators", "tokenize,ssplit,pos,parse,depparse,sentiment"); //coref too expensive memorywise
propsSentiment.setProperty("parse.maxlen", "90");
propsSentiment.setProperty("threads", "25");
propsSentiment.setProperty("pos.maxlen", "90");
propsSentiment.setProperty("tokenize.maxlen", "90");
propsSentiment.setProperty("ssplit.maxlen", "90");
propsSentiment.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment"); //coref too expensive memorywise, does it need depparse?
propsSentiment.setProperty("tokenize.options", "untokenizable=firstDelete");
pipelineSentiment = new StanfordCoreNLP(propsSentiment);
tagger = new MaxentTagger(taggerPath);
@ -121,8 +126,13 @@ public class Datahandler {
}).start();
new Thread(() -> {
props.setProperty("parse.model", shiftReduceParserPath);
props.setProperty("parse.maxlen", "100");
props.setProperty("parse.maxlen", "90");
props.setProperty("parse.binaryTrees", "true");
props.setProperty("threads", "25");
props.setProperty("pos.maxlen", "90");
props.setProperty("tokenize.maxlen", "90");
props.setProperty("ssplit.maxlen", "90");
props.setProperty("lemma.maxlen", "90");
props.setProperty("annotators", "tokenize,ssplit,pos,lemma,parse");
props.setProperty("tokenize.options", "untokenizable=firstDelete");
pipeline = new StanfordCoreNLP(props);
@ -205,9 +215,17 @@ public class Datahandler {
}
public void addHLstatsMessages() {
ConcurrentMap<Integer, String> hlStatsMessages = DataMapper.getHLstatsMessages();
ConcurrentMap<Integer, String> hlStatsMessages = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<Integer, String> strCacheLocal = stringCache;
//might want a hardcap
int hardcap = 8500;
int ij = 0;
for (String str : DataMapper.getHLstatsMessages().values()) {
hlStatsMessages.put(ij, str);
ij++;
if (ij > hardcap) {
break;
}
}
hlStatsMessages.values().parallelStream().forEach(str -> {
if (!str.startsWith("!")) {
String orElse = strCacheLocal.values().parallelStream().filter(e -> e.equals(str)).findAny().orElse(null);
@ -229,19 +247,26 @@ public class Datahandler {
public void instantiateAnnotationMap() {
if (!stringCache.isEmpty()) {
ConcurrentMap<String, Annotation> Annotationspipeline = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<String, Annotation> AnnotationspipelineSentiment = new MapMaker().concurrencyLevel(2).makeMap();
stringCache.values().parallelStream().forEach(str -> {
System.out.println("str annotation pipeline pipelinesentiment: " + str + "\n");
Annotation strAnno = new Annotation(str);
pipeline.annotate(strAnno);
pipelineAnnotationCache.put(str, strAnno);
Annotationspipeline.put(str, strAnno);
Annotation strAnno2 = new Annotation(str);
pipelineSentiment.annotate(strAnno2);
pipelineSentimentAnnotationCache.put(str, strAnno2);
AnnotationspipelineSentiment.put(str, strAnno2);
});
pipeline.annotate(Annotationspipeline.values());
pipelineSentiment.annotate(AnnotationspipelineSentiment.values());
Annotationspipeline.entrySet().forEach(pipelineEntry -> {
pipelineAnnotationCache.put(pipelineEntry.getKey(), pipelineEntry.getValue());
});
AnnotationspipelineSentiment.entrySet().forEach(pipelineEntry -> {
pipelineSentimentAnnotationCache.put(pipelineEntry.getKey(), pipelineEntry.getValue());
});
}
}
public synchronized void checkIfUpdateMatrixes() {
public synchronized void updateMatrixes() {
refreshMatrixFromDB = false;
if (stopwatch1.elapsed(TimeUnit.SECONDS) >= EXPIRE_TIME_IN_SECONDS1) {
refreshMatrixFromDB = true;
@ -276,57 +301,54 @@ public class Datahandler {
selectUpdate = secondaryIterator;
secondaryIterator++;
}
ConcurrentMap<Integer, String> strIndexNavigator = new MapMaker().concurrencyLevel(2).makeMap();
String get = stringCachelocal.getOrDefault(selectUpdate, null);
if (get == null) {
get = stringCachelocal.get(new Random().nextInt(stringCachelocal.size() - 1));
}
strIndexNavigator.put(0, get);
final String getStringCacheStr = stringCachelocal.getOrDefault(selectUpdate, null);
ConcurrentMap<Integer, SimilarityMatrix> matrixUpdateList = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<Integer, Future<SimilarityMatrix>> futures = new MapMaker().concurrencyLevel(2).makeMap();
strIndexNavigator.values().forEach((str) -> {
stringCachelocal.values().stream().filter((str1) -> (!str.equals(str1))).forEachOrdered((str1) -> {
boolean present = false;
LinkedHashMap<String, Double> orDefault = lHMSMX.getOrDefault(str, null);
stringCachelocal.values().forEach((str1) -> {
boolean present = false;
LinkedHashMap<String, Double> orDefault = lHMSMX.getOrDefault(getStringCacheStr, null);
if (orDefault != null) {
Iterator<String> strDefaultsItr = orDefault.keySet().iterator();
while (strDefaultsItr.hasNext()) {
String strkey = strDefaultsItr.next();
if (strkey.equals(str1)) {
present = true;
break;
}
}
}
if (!present) {
orDefault = lHMSMX.getOrDefault(str1, null);
if (orDefault != null) {
for (String strkey : orDefault.keySet()) {
if (strkey.equals(str1)) {
Iterator<String> strDefaultsItr = orDefault.keySet().iterator();
while (strDefaultsItr.hasNext()) {
String strkey = strDefaultsItr.next();
if (strkey.equals(getStringCacheStr)) {
present = true;
break;
}
}
}
if (!present) {
orDefault = lHMSMX.getOrDefault(str1, null);
if (orDefault != null) {
for (String strkey : orDefault.keySet()) {
if (strkey.equals(str)) {
present = true;
break;
}
}
}
}
if (!present) {
LinkedHashMap<String, Double> orDefault1 = lHMSMX.getOrDefault(getStringCacheStr, null);
if (orDefault1 == null) {
orDefault1 = new LinkedHashMap<String, Double>();
}
if (!present) {
LinkedHashMap<String, Double> orDefault1 = lHMSMX.getOrDefault(str, null);
if (orDefault1 == null) {
orDefault1 = new LinkedHashMap<String, Double>();
}
orDefault1.put(str1, 0.0);
lHMSMX.put(str, orDefault1);
SimilarityMatrix SMX = new SimilarityMatrix(str, str1);
Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(str, str1, SMX, jmweAnnotationCache.get(str),
jmweAnnotationCache.get(str1), pipelineAnnotationCache.get(str), pipelineAnnotationCache.get(str1),
pipelineSentimentAnnotationCache.get(str), pipelineSentimentAnnotationCache.get(str1));
futures.put(futures.size() + 1, executor.submit(worker));
}
});
orDefault1.put(str1, 0.0);
lHMSMX.put(getStringCacheStr, orDefault1);
SimilarityMatrix SMX = new SimilarityMatrix(getStringCacheStr, str1);
Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(getStringCacheStr, str1, SMX, jmweAnnotationCache.get(getStringCacheStr),
jmweAnnotationCache.get(str1), pipelineAnnotationCache.get(getStringCacheStr), pipelineAnnotationCache.get(str1),
pipelineSentimentAnnotationCache.get(getStringCacheStr), pipelineSentimentAnnotationCache.get(str1));
futures.put(futures.size() + 1, executor.submit(worker));
}
});
System.out.println("finished worker assignment, futures size: " + futures.size() + "\n");
for (Future<SimilarityMatrix> future : futures.values()) {
futures.values().parallelStream().forEach((future) -> {
SimilarityMatrix SMX = new SimilarityMatrix("", "");
try {
SMX = future.get(20, TimeUnit.SECONDS);
SMX = future.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
Logger.getLogger(Datahandler.class.getName()).log(Level.SEVERE, null, ex);
SMX = null;
@ -337,7 +359,7 @@ public class Datahandler {
lHMSMX.put(SMX.getPrimaryString(), getFuture);
matrixUpdateList.put(matrixUpdateList.size() + 1, SMX);
}
}
});
try {
if (!matrixUpdateList.isEmpty()) {
DataMapper.insertSementicMatrixes(matrixUpdateList);
@ -357,8 +379,8 @@ public class Datahandler {
str = filterContent(str);
str = removeSlacks(str);
System.out.println("finished removeSlacks \n" + str.size() + "\n");
str = verifyCalculationFitness(str);
System.out.println("Check if updateString str size POST: " + str.size() + "\n");
str = annotationCacheUpdate(str);
System.out.println("annotationCacheUpdate str size POST: " + str.size() + "\n");
try {
DataMapper.InsertMYSQLStrings(str);
} catch (CustomError ex) {
@ -694,67 +716,29 @@ public class Datahandler {
return strreturn;
}
private ConcurrentMap<Integer, String> verifyCalculationFitness(ConcurrentMap<Integer, String> strmap) {
ConcurrentMap<Integer, String> returnmap = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<String, Annotation> pipelineAnnotateCachelcl = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<String, Annotation> pipelineSentimentAnnotateCachelcl = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<String, Annotation> jmweAnnotateCachelcl = new MapMaker().concurrencyLevel(2).makeMap();
private ConcurrentMap<Integer, String> annotationCacheUpdate(ConcurrentMap<Integer, String> strmap) {
ConcurrentMap<String, Annotation> jmweAnnotation = PipelineJMWESingleton.INSTANCE.getJMWEAnnotation(strmap.values());
for (Entry<String, Annotation> jmweitr : jmweAnnotation.entrySet()) {
jmweAnnotateCachelcl.put(jmweitr.getKey(), jmweitr.getValue());
}
strmap.values().parallelStream().forEach(strCache -> {
Annotation strAnno = new Annotation(strCache);
pipeline.annotate(strAnno);
pipelineAnnotateCachelcl.put(strCache, strAnno);
Annotation strAnno2 = new Annotation(strCache);
pipelineSentiment.annotate(strAnno2);
pipelineSentimentAnnotateCachelcl.put(strCache, strAnno2);
System.out.println("normal annotating strCache: " + strCache + "\n");
});
final ConcurrentMap<Integer, String> allStrings;
if (!stringCache.isEmpty()) {
allStrings = stringCache;
} else {
allStrings = strmap;
}
ConcurrentMap<Integer, Future<SimilarityMatrix>> futures = new MapMaker().concurrencyLevel(2).makeMap();
strmap.values().parallelStream().forEach((str) -> {
for (String str1 : allStrings.values()) {
Callable<SimilarityMatrix> worker = new SentimentAnalyzerTest(str, str1, new SimilarityMatrix(str, str1),
jmweAnnotateCachelcl.get(str), jmweAnnotateCachelcl.get(str1), pipelineAnnotateCachelcl.get(str),
pipelineAnnotateCachelcl.get(str1), pipelineSentimentAnnotateCachelcl.get(str),
pipelineSentimentAnnotateCachelcl.get(str1));
futures.put(futures.size() + 1, executor.submit(worker));
System.out.println("futures size in verify calcs: " + futures.size() + "\n");
}
});
futures.values().parallelStream().forEach((future) -> {
SimilarityMatrix get;
//turning from 20 to 5 might be risky?
try {
get = future.get(5, TimeUnit.SECONDS);
String addStr = get.getPrimaryString();
returnmap.put(returnmap.size() + 1, addStr);
System.out.println("returnmap adding: " + addStr + "\n");
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
System.out.println("failed verification: " + ex.getMessage() + "\n");
}
});
jmweAnnotation = PipelineJMWESingleton.INSTANCE.getJMWEAnnotation(returnmap.values());
for (Entry<String, Annotation> jmweitr : jmweAnnotation.entrySet()) {
jmweAnnotationCache.put(jmweitr.getKey(), jmweitr.getValue());
}
returnmap.values().parallelStream().forEach(strCache -> {
stringCache.put(stringCache.size() + 1, strCache);
System.out.println("str annotation pipeline pipelinesentiment: " + strCache + "\n");
Annotation strAnno = new Annotation(strCache);
pipeline.annotate(strAnno);
pipelineAnnotationCache.put(strCache, strAnno);
Annotation strAnno2 = new Annotation(strCache);
pipelineSentiment.annotate(strAnno2);
pipelineSentimentAnnotationCache.put(strCache, strAnno2);
ConcurrentMap<String, Annotation> Annotationspipeline = new MapMaker().concurrencyLevel(2).makeMap();
ConcurrentMap<String, Annotation> AnnotationspipelineSentiment = new MapMaker().concurrencyLevel(2).makeMap();
strmap.values().parallelStream().forEach(str -> {
Annotation strAnno = new Annotation(str);
Annotationspipeline.put(str, strAnno);
Annotation strAnno2 = new Annotation(str);
AnnotationspipelineSentiment.put(str, strAnno2);
stringCache.put(stringCache.size() + 1, str);
});
return returnmap;
System.out.println("pre iterator annotation update \n");
pipeline.annotate(Annotationspipeline.values());
pipelineSentiment.annotate(AnnotationspipelineSentiment.values());
Annotationspipeline.entrySet().forEach(pipelineEntry -> {
pipelineAnnotationCache.put(pipelineEntry.getKey(), pipelineEntry.getValue());
});
AnnotationspipelineSentiment.entrySet().forEach(pipelineEntry -> {
pipelineSentimentAnnotationCache.put(pipelineEntry.getKey(), pipelineEntry.getValue());
});
return strmap;
}
}

View File

@ -18,11 +18,8 @@ import edu.mit.jmwe.detect.MoreFrequentAsMWE;
import edu.mit.jmwe.detect.ProperNouns;
import edu.mit.jmwe.index.IMWEIndex;
import edu.mit.jmwe.index.MWEIndex;
import edu.stanford.nlp.ling.CoreAnnotation;
import edu.stanford.nlp.ling.CoreAnnotations;
import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation;
import edu.stanford.nlp.ling.CoreLabel;
import edu.stanford.nlp.ling.CoreLabel.GenericAnnotation;
import edu.stanford.nlp.ling.JMWEAnnotation;
import edu.stanford.nlp.pipeline.Annotation;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
@ -31,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
@ -44,7 +42,6 @@ public class PipelineJMWESingleton {
//if not needed to be volatile dont make it, increases time
public volatile static PipelineJMWESingleton INSTANCE;
private volatile static int incrementer = 0;
private static StanfordCoreNLP localNLP = initializeJMWE();
private static String underscoreSpaceReplacement;
@ -66,20 +63,21 @@ public class PipelineJMWESingleton {
try {
index.open();
} catch (IOException e) {
throw new RuntimeException("unable to open IMWEIndex index");
throw new RuntimeException("unable to open IMWEIndex index: " + e + "\n");
}
IMWEDetector detector = getDetector(index, detectorName);
ConcurrentMap<String, Annotation> returnAnnotations = new MapMaker().concurrencyLevel(2).makeMap();
Date startDate = new Date();
strvalues.parallelStream().forEach(str -> {
Annotation annoStr = new Annotation(str);
localNLP.annotate(annoStr);
returnAnnotations.put(str, annoStr);
});
localNLP.annotate(returnAnnotations.values());
returnAnnotations.values().parallelStream().forEach(annoStr -> {
for (CoreMap sentence : annoStr.get(CoreAnnotations.SentencesAnnotation.class)) {
List<IMWE<IToken>> mwes = getjMWEInSentence(sentence, index, detector, verbose);
sentence.set(JMWEAnnotation.class, mwes);
}
returnAnnotations.put(str, annoStr);
System.out.println("incrementer: " + incrementer + "\n");
incrementer++;
});
index.close();
return returnAnnotations;
@ -90,6 +88,11 @@ public class PipelineJMWESingleton {
propsJMWE = new Properties();
propsJMWE.setProperty("annotators", "tokenize,ssplit,pos,lemma");
propsJMWE.setProperty("tokenize.options", "untokenizable=firstDelete");
propsJMWE.setProperty("threads", "25");
propsJMWE.setProperty("pos.maxlen", "90");
propsJMWE.setProperty("tokenize.maxlen", "90");
propsJMWE.setProperty("ssplit.maxlen", "90");
propsJMWE.setProperty("lemma.maxlen", "90");
underscoreSpaceReplacement = "-";
localNLP = new StanfordCoreNLP(propsJMWE);
System.out.println("finished singleton constructor \n");

View File

@ -52,11 +52,11 @@ public class DiscordHandler {
Datahandler.instance.updateStringCache();
//order matters
if (Datahandler.instance.getstringCacheSize() != 0) {
while (Datahandler.instance.getlHMSMXSize() * Datahandler.instance.getlHMSMXSize() * 2.5
while (Datahandler.instance.getlHMSMXSize() * Datahandler.instance.getlHMSMXSize() * 3
< (Datahandler.instance.getstringCacheSize()
* Datahandler.instance.getstringCacheSize())
- Datahandler.instance.getstringCacheSize()) {
Datahandler.instance.checkIfUpdateMatrixes();
Datahandler.instance.updateMatrixes();
}
}
String token = "NTI5NzAxNTk5NjAyMjc4NDAx.Dw0vDg.7-aMjVWdQMYPl8qVNyvTCPS5F_A";
@ -91,7 +91,7 @@ public class DiscordHandler {
new Thread(() -> {
try {
Datahandler.instance.checkIfUpdateStrings(false);
Datahandler.instance.checkIfUpdateMatrixes();
Datahandler.instance.updateMatrixes();
} catch (CustomError ex) {
Logger.getLogger(DiscordHandler.class.getName()).log(Level.SEVERE, null, ex);
}