From 511eb0e492d277147f6687ee3f58439b9dbc7a8f Mon Sep 17 00:00:00 2001 From: jenzur Date: Tue, 26 Mar 2019 21:38:03 +0100 Subject: [PATCH] pipeline propperty updates, untokenizeable removed, updating the matrixupdater, coreNLP internal thread devider is very effective for mass annotation compared to simple streaming --- .../main/java/FunctionLayer/Datahandler.java | 206 ++++++++---------- .../FunctionLayer/PipelineJMWESingleton.java | 21 +- .../PresentationLayer/DiscordHandler.java | 6 +- 3 files changed, 110 insertions(+), 123 deletions(-) diff --git a/ArtificialAutism/src/main/java/FunctionLayer/Datahandler.java b/ArtificialAutism/src/main/java/FunctionLayer/Datahandler.java index 66f2f16d..9971f3a7 100644 --- a/ArtificialAutism/src/main/java/FunctionLayer/Datahandler.java +++ b/ArtificialAutism/src/main/java/FunctionLayer/Datahandler.java @@ -29,6 +29,7 @@ import java.io.StringReader; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -56,7 +57,7 @@ public class Datahandler { public static final long EXPIRE_TIME_IN_SECONDS1 = TimeUnit.SECONDS.convert(10, TimeUnit.HOURS); public static Datahandler instance = new Datahandler(); private volatile boolean refreshMatrixFromDB; - private static int secondaryIterator = 0; + private static volatile int secondaryIterator = 0; private final ConcurrentMap stringCache; private static ConcurrentMap pipelineAnnotationCache; private static ConcurrentMap pipelineSentimentAnnotationCache; @@ -72,7 +73,7 @@ public class Datahandler { private static String nerModel = "edu/stanford/nlp/models/ner/english.all.3class.distsim.crf.ser.gz"; private static MaxentTagger tagger; private static ShiftReduceParser model; - private static String[] options = {"-maxLength", "100"}; + private static String[] options = {"-maxLength", "90"}; private static Properties props = new Properties(); private static Properties propsSentiment = new Properties(); private static GrammaticalStructureFactory gsf; @@ -112,8 +113,12 @@ public class Datahandler { propsSentiment.setProperty("parse.model", lexParserEnglishRNN); propsSentiment.setProperty("ner.model", nerModel); propsSentiment.setProperty("sentiment.model", sentimentModel); - propsSentiment.setProperty("parse.maxlen", "100"); - propsSentiment.setProperty("annotators", "tokenize,ssplit,pos,parse,depparse,sentiment"); //coref too expensive memorywise + propsSentiment.setProperty("parse.maxlen", "90"); + propsSentiment.setProperty("threads", "25"); + propsSentiment.setProperty("pos.maxlen", "90"); + propsSentiment.setProperty("tokenize.maxlen", "90"); + propsSentiment.setProperty("ssplit.maxlen", "90"); + propsSentiment.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment"); //coref too expensive memorywise, does it need depparse? propsSentiment.setProperty("tokenize.options", "untokenizable=firstDelete"); pipelineSentiment = new StanfordCoreNLP(propsSentiment); tagger = new MaxentTagger(taggerPath); @@ -121,8 +126,13 @@ public class Datahandler { }).start(); new Thread(() -> { props.setProperty("parse.model", shiftReduceParserPath); - props.setProperty("parse.maxlen", "100"); + props.setProperty("parse.maxlen", "90"); props.setProperty("parse.binaryTrees", "true"); + props.setProperty("threads", "25"); + props.setProperty("pos.maxlen", "90"); + props.setProperty("tokenize.maxlen", "90"); + props.setProperty("ssplit.maxlen", "90"); + props.setProperty("lemma.maxlen", "90"); props.setProperty("annotators", "tokenize,ssplit,pos,lemma,parse"); props.setProperty("tokenize.options", "untokenizable=firstDelete"); pipeline = new StanfordCoreNLP(props); @@ -205,9 +215,17 @@ public class Datahandler { } public void addHLstatsMessages() { - ConcurrentMap hlStatsMessages = DataMapper.getHLstatsMessages(); + ConcurrentMap hlStatsMessages = new MapMaker().concurrencyLevel(2).makeMap(); ConcurrentMap strCacheLocal = stringCache; - //might want a hardcap + int hardcap = 8500; + int ij = 0; + for (String str : DataMapper.getHLstatsMessages().values()) { + hlStatsMessages.put(ij, str); + ij++; + if (ij > hardcap) { + break; + } + } hlStatsMessages.values().parallelStream().forEach(str -> { if (!str.startsWith("!")) { String orElse = strCacheLocal.values().parallelStream().filter(e -> e.equals(str)).findAny().orElse(null); @@ -229,19 +247,26 @@ public class Datahandler { public void instantiateAnnotationMap() { if (!stringCache.isEmpty()) { + ConcurrentMap Annotationspipeline = new MapMaker().concurrencyLevel(2).makeMap(); + ConcurrentMap AnnotationspipelineSentiment = new MapMaker().concurrencyLevel(2).makeMap(); stringCache.values().parallelStream().forEach(str -> { - System.out.println("str annotation pipeline pipelinesentiment: " + str + "\n"); Annotation strAnno = new Annotation(str); - pipeline.annotate(strAnno); - pipelineAnnotationCache.put(str, strAnno); + Annotationspipeline.put(str, strAnno); Annotation strAnno2 = new Annotation(str); - pipelineSentiment.annotate(strAnno2); - pipelineSentimentAnnotationCache.put(str, strAnno2); + AnnotationspipelineSentiment.put(str, strAnno2); + }); + pipeline.annotate(Annotationspipeline.values()); + pipelineSentiment.annotate(AnnotationspipelineSentiment.values()); + Annotationspipeline.entrySet().forEach(pipelineEntry -> { + pipelineAnnotationCache.put(pipelineEntry.getKey(), pipelineEntry.getValue()); + }); + AnnotationspipelineSentiment.entrySet().forEach(pipelineEntry -> { + pipelineSentimentAnnotationCache.put(pipelineEntry.getKey(), pipelineEntry.getValue()); }); } } - public synchronized void checkIfUpdateMatrixes() { + public synchronized void updateMatrixes() { refreshMatrixFromDB = false; if (stopwatch1.elapsed(TimeUnit.SECONDS) >= EXPIRE_TIME_IN_SECONDS1) { refreshMatrixFromDB = true; @@ -276,57 +301,54 @@ public class Datahandler { selectUpdate = secondaryIterator; secondaryIterator++; } - ConcurrentMap strIndexNavigator = new MapMaker().concurrencyLevel(2).makeMap(); - String get = stringCachelocal.getOrDefault(selectUpdate, null); - if (get == null) { - get = stringCachelocal.get(new Random().nextInt(stringCachelocal.size() - 1)); - } - strIndexNavigator.put(0, get); + final String getStringCacheStr = stringCachelocal.getOrDefault(selectUpdate, null); ConcurrentMap matrixUpdateList = new MapMaker().concurrencyLevel(2).makeMap(); ConcurrentMap> futures = new MapMaker().concurrencyLevel(2).makeMap(); - strIndexNavigator.values().forEach((str) -> { - stringCachelocal.values().stream().filter((str1) -> (!str.equals(str1))).forEachOrdered((str1) -> { - boolean present = false; - LinkedHashMap orDefault = lHMSMX.getOrDefault(str, null); + stringCachelocal.values().forEach((str1) -> { + boolean present = false; + LinkedHashMap orDefault = lHMSMX.getOrDefault(getStringCacheStr, null); + if (orDefault != null) { + Iterator strDefaultsItr = orDefault.keySet().iterator(); + while (strDefaultsItr.hasNext()) { + String strkey = strDefaultsItr.next(); + if (strkey.equals(str1)) { + present = true; + break; + } + } + } + if (!present) { + orDefault = lHMSMX.getOrDefault(str1, null); if (orDefault != null) { - for (String strkey : orDefault.keySet()) { - if (strkey.equals(str1)) { + Iterator strDefaultsItr = orDefault.keySet().iterator(); + while (strDefaultsItr.hasNext()) { + String strkey = strDefaultsItr.next(); + if (strkey.equals(getStringCacheStr)) { present = true; break; } } } - if (!present) { - orDefault = lHMSMX.getOrDefault(str1, null); - if (orDefault != null) { - for (String strkey : orDefault.keySet()) { - if (strkey.equals(str)) { - present = true; - break; - } - } - } + } + if (!present) { + LinkedHashMap orDefault1 = lHMSMX.getOrDefault(getStringCacheStr, null); + if (orDefault1 == null) { + orDefault1 = new LinkedHashMap(); } - if (!present) { - LinkedHashMap orDefault1 = lHMSMX.getOrDefault(str, null); - if (orDefault1 == null) { - orDefault1 = new LinkedHashMap(); - } - orDefault1.put(str1, 0.0); - lHMSMX.put(str, orDefault1); - SimilarityMatrix SMX = new SimilarityMatrix(str, str1); - Callable worker = new SentimentAnalyzerTest(str, str1, SMX, jmweAnnotationCache.get(str), - jmweAnnotationCache.get(str1), pipelineAnnotationCache.get(str), pipelineAnnotationCache.get(str1), - pipelineSentimentAnnotationCache.get(str), pipelineSentimentAnnotationCache.get(str1)); - futures.put(futures.size() + 1, executor.submit(worker)); - } - }); + orDefault1.put(str1, 0.0); + lHMSMX.put(getStringCacheStr, orDefault1); + SimilarityMatrix SMX = new SimilarityMatrix(getStringCacheStr, str1); + Callable worker = new SentimentAnalyzerTest(getStringCacheStr, str1, SMX, jmweAnnotationCache.get(getStringCacheStr), + jmweAnnotationCache.get(str1), pipelineAnnotationCache.get(getStringCacheStr), pipelineAnnotationCache.get(str1), + pipelineSentimentAnnotationCache.get(getStringCacheStr), pipelineSentimentAnnotationCache.get(str1)); + futures.put(futures.size() + 1, executor.submit(worker)); + } }); System.out.println("finished worker assignment, futures size: " + futures.size() + "\n"); - for (Future future : futures.values()) { + futures.values().parallelStream().forEach((future) -> { SimilarityMatrix SMX = new SimilarityMatrix("", ""); try { - SMX = future.get(20, TimeUnit.SECONDS); + SMX = future.get(5, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException ex) { Logger.getLogger(Datahandler.class.getName()).log(Level.SEVERE, null, ex); SMX = null; @@ -337,7 +359,7 @@ public class Datahandler { lHMSMX.put(SMX.getPrimaryString(), getFuture); matrixUpdateList.put(matrixUpdateList.size() + 1, SMX); } - } + }); try { if (!matrixUpdateList.isEmpty()) { DataMapper.insertSementicMatrixes(matrixUpdateList); @@ -357,8 +379,8 @@ public class Datahandler { str = filterContent(str); str = removeSlacks(str); System.out.println("finished removeSlacks \n" + str.size() + "\n"); - str = verifyCalculationFitness(str); - System.out.println("Check if updateString str size POST: " + str.size() + "\n"); + str = annotationCacheUpdate(str); + System.out.println("annotationCacheUpdate str size POST: " + str.size() + "\n"); try { DataMapper.InsertMYSQLStrings(str); } catch (CustomError ex) { @@ -694,67 +716,29 @@ public class Datahandler { return strreturn; } - private ConcurrentMap verifyCalculationFitness(ConcurrentMap strmap) { - ConcurrentMap returnmap = new MapMaker().concurrencyLevel(2).makeMap(); - ConcurrentMap pipelineAnnotateCachelcl = new MapMaker().concurrencyLevel(2).makeMap(); - ConcurrentMap pipelineSentimentAnnotateCachelcl = new MapMaker().concurrencyLevel(2).makeMap(); - ConcurrentMap jmweAnnotateCachelcl = new MapMaker().concurrencyLevel(2).makeMap(); + private ConcurrentMap annotationCacheUpdate(ConcurrentMap strmap) { ConcurrentMap jmweAnnotation = PipelineJMWESingleton.INSTANCE.getJMWEAnnotation(strmap.values()); - for (Entry jmweitr : jmweAnnotation.entrySet()) { - jmweAnnotateCachelcl.put(jmweitr.getKey(), jmweitr.getValue()); - } - strmap.values().parallelStream().forEach(strCache -> { - Annotation strAnno = new Annotation(strCache); - pipeline.annotate(strAnno); - pipelineAnnotateCachelcl.put(strCache, strAnno); - Annotation strAnno2 = new Annotation(strCache); - pipelineSentiment.annotate(strAnno2); - pipelineSentimentAnnotateCachelcl.put(strCache, strAnno2); - System.out.println("normal annotating strCache: " + strCache + "\n"); - }); - final ConcurrentMap allStrings; - if (!stringCache.isEmpty()) { - allStrings = stringCache; - } else { - allStrings = strmap; - } - ConcurrentMap> futures = new MapMaker().concurrencyLevel(2).makeMap(); - strmap.values().parallelStream().forEach((str) -> { - for (String str1 : allStrings.values()) { - Callable worker = new SentimentAnalyzerTest(str, str1, new SimilarityMatrix(str, str1), - jmweAnnotateCachelcl.get(str), jmweAnnotateCachelcl.get(str1), pipelineAnnotateCachelcl.get(str), - pipelineAnnotateCachelcl.get(str1), pipelineSentimentAnnotateCachelcl.get(str), - pipelineSentimentAnnotateCachelcl.get(str1)); - futures.put(futures.size() + 1, executor.submit(worker)); - System.out.println("futures size in verify calcs: " + futures.size() + "\n"); - } - }); - futures.values().parallelStream().forEach((future) -> { - SimilarityMatrix get; - //turning from 20 to 5 might be risky? - try { - get = future.get(5, TimeUnit.SECONDS); - String addStr = get.getPrimaryString(); - returnmap.put(returnmap.size() + 1, addStr); - System.out.println("returnmap adding: " + addStr + "\n"); - } catch (InterruptedException | ExecutionException | TimeoutException ex) { - System.out.println("failed verification: " + ex.getMessage() + "\n"); - } - }); - jmweAnnotation = PipelineJMWESingleton.INSTANCE.getJMWEAnnotation(returnmap.values()); for (Entry jmweitr : jmweAnnotation.entrySet()) { jmweAnnotationCache.put(jmweitr.getKey(), jmweitr.getValue()); } - returnmap.values().parallelStream().forEach(strCache -> { - stringCache.put(stringCache.size() + 1, strCache); - System.out.println("str annotation pipeline pipelinesentiment: " + strCache + "\n"); - Annotation strAnno = new Annotation(strCache); - pipeline.annotate(strAnno); - pipelineAnnotationCache.put(strCache, strAnno); - Annotation strAnno2 = new Annotation(strCache); - pipelineSentiment.annotate(strAnno2); - pipelineSentimentAnnotationCache.put(strCache, strAnno2); + ConcurrentMap Annotationspipeline = new MapMaker().concurrencyLevel(2).makeMap(); + ConcurrentMap AnnotationspipelineSentiment = new MapMaker().concurrencyLevel(2).makeMap(); + strmap.values().parallelStream().forEach(str -> { + Annotation strAnno = new Annotation(str); + Annotationspipeline.put(str, strAnno); + Annotation strAnno2 = new Annotation(str); + AnnotationspipelineSentiment.put(str, strAnno2); + stringCache.put(stringCache.size() + 1, str); }); - return returnmap; + System.out.println("pre iterator annotation update \n"); + pipeline.annotate(Annotationspipeline.values()); + pipelineSentiment.annotate(AnnotationspipelineSentiment.values()); + Annotationspipeline.entrySet().forEach(pipelineEntry -> { + pipelineAnnotationCache.put(pipelineEntry.getKey(), pipelineEntry.getValue()); + }); + AnnotationspipelineSentiment.entrySet().forEach(pipelineEntry -> { + pipelineSentimentAnnotationCache.put(pipelineEntry.getKey(), pipelineEntry.getValue()); + }); + return strmap; } } diff --git a/ArtificialAutism/src/main/java/FunctionLayer/PipelineJMWESingleton.java b/ArtificialAutism/src/main/java/FunctionLayer/PipelineJMWESingleton.java index e6e0cc52..9998bc64 100644 --- a/ArtificialAutism/src/main/java/FunctionLayer/PipelineJMWESingleton.java +++ b/ArtificialAutism/src/main/java/FunctionLayer/PipelineJMWESingleton.java @@ -18,11 +18,8 @@ import edu.mit.jmwe.detect.MoreFrequentAsMWE; import edu.mit.jmwe.detect.ProperNouns; import edu.mit.jmwe.index.IMWEIndex; import edu.mit.jmwe.index.MWEIndex; -import edu.stanford.nlp.ling.CoreAnnotation; import edu.stanford.nlp.ling.CoreAnnotations; -import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.ling.CoreLabel; -import edu.stanford.nlp.ling.CoreLabel.GenericAnnotation; import edu.stanford.nlp.ling.JMWEAnnotation; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; @@ -31,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentMap; @@ -44,7 +42,6 @@ public class PipelineJMWESingleton { //if not needed to be volatile dont make it, increases time public volatile static PipelineJMWESingleton INSTANCE; - private volatile static int incrementer = 0; private static StanfordCoreNLP localNLP = initializeJMWE(); private static String underscoreSpaceReplacement; @@ -66,20 +63,21 @@ public class PipelineJMWESingleton { try { index.open(); } catch (IOException e) { - throw new RuntimeException("unable to open IMWEIndex index"); + throw new RuntimeException("unable to open IMWEIndex index: " + e + "\n"); } IMWEDetector detector = getDetector(index, detectorName); ConcurrentMap returnAnnotations = new MapMaker().concurrencyLevel(2).makeMap(); + Date startDate = new Date(); strvalues.parallelStream().forEach(str -> { Annotation annoStr = new Annotation(str); - localNLP.annotate(annoStr); + returnAnnotations.put(str, annoStr); + }); + localNLP.annotate(returnAnnotations.values()); + returnAnnotations.values().parallelStream().forEach(annoStr -> { for (CoreMap sentence : annoStr.get(CoreAnnotations.SentencesAnnotation.class)) { List> mwes = getjMWEInSentence(sentence, index, detector, verbose); sentence.set(JMWEAnnotation.class, mwes); } - returnAnnotations.put(str, annoStr); - System.out.println("incrementer: " + incrementer + "\n"); - incrementer++; }); index.close(); return returnAnnotations; @@ -90,6 +88,11 @@ public class PipelineJMWESingleton { propsJMWE = new Properties(); propsJMWE.setProperty("annotators", "tokenize,ssplit,pos,lemma"); propsJMWE.setProperty("tokenize.options", "untokenizable=firstDelete"); + propsJMWE.setProperty("threads", "25"); + propsJMWE.setProperty("pos.maxlen", "90"); + propsJMWE.setProperty("tokenize.maxlen", "90"); + propsJMWE.setProperty("ssplit.maxlen", "90"); + propsJMWE.setProperty("lemma.maxlen", "90"); underscoreSpaceReplacement = "-"; localNLP = new StanfordCoreNLP(propsJMWE); System.out.println("finished singleton constructor \n"); diff --git a/ArtificialAutism/src/main/java/PresentationLayer/DiscordHandler.java b/ArtificialAutism/src/main/java/PresentationLayer/DiscordHandler.java index dba53629..1861ce57 100644 --- a/ArtificialAutism/src/main/java/PresentationLayer/DiscordHandler.java +++ b/ArtificialAutism/src/main/java/PresentationLayer/DiscordHandler.java @@ -52,11 +52,11 @@ public class DiscordHandler { Datahandler.instance.updateStringCache(); //order matters if (Datahandler.instance.getstringCacheSize() != 0) { - while (Datahandler.instance.getlHMSMXSize() * Datahandler.instance.getlHMSMXSize() * 2.5 + while (Datahandler.instance.getlHMSMXSize() * Datahandler.instance.getlHMSMXSize() * 3 < (Datahandler.instance.getstringCacheSize() * Datahandler.instance.getstringCacheSize()) - Datahandler.instance.getstringCacheSize()) { - Datahandler.instance.checkIfUpdateMatrixes(); + Datahandler.instance.updateMatrixes(); } } String token = "NTI5NzAxNTk5NjAyMjc4NDAx.Dw0vDg.7-aMjVWdQMYPl8qVNyvTCPS5F_A"; @@ -91,7 +91,7 @@ public class DiscordHandler { new Thread(() -> { try { Datahandler.instance.checkIfUpdateStrings(false); - Datahandler.instance.checkIfUpdateMatrixes(); + Datahandler.instance.updateMatrixes(); } catch (CustomError ex) { Logger.getLogger(DiscordHandler.class.getName()).log(Level.SEVERE, null, ex); }