In unserem vorherigen Beitrag, habe ich einen Weg ausgearbeitet, um Echtzeit-Twitter-Daten zu extrahieren, und zwar mit Apache Flume. Derzeit habe ich eine Menge Daten von Twitter erhalten. Daher möchte ich diese analysieren und einige Trends daraus ableiten. Um eine Sentiment-Analyse der Twitter-Daten durchzuführen, werde ich ein weiteres Big-Data-Tool verwenden, Apache Spark.
Laut Hortonworks, “Apache Spark ist eine schnelle In-Memory-Datenverarbeitungs-Engine mit eleganten und ausdrucksstarken Entwicklungs-APIs, die es Datenanalysten ermöglichen, Streaming-, Machine-Learning- oder SQL-Workloads, die einen schnellen iterativen Zugriff auf Datensätze erfordern, effizient auszuführen. Mit Spark auf Apache Hadoop YARN können Entwickler überall nun Anwendungen erstellen, um die Leistung von Spark zu nutzen, Erkenntnisse zu gewinnen und ihre Data-Science-Workloads innerhalb eines einzigen, gemeinsam genutzten Datensatzes in Hadoop zu bereichern.”
Ein Spark-Programm kann in JAVA, Scala, Python oder R geschrieben werden. In diesem Fall werden wir JAVA zusammen mit Maven verwenden. Darüber hinaus wird Spark sowohl mit der HDP- als auch mit der Cloudera-Distribution ausgeliefert. Spark 2 ist die aktuell verwendete Version.
Um die Sentiment-Analyse mit Spark durchzuführen, erstelle ich ein neues Maven-Projekt. Ich nenne es Twitter Sentiment Analyzer’. Als Nächstes erstelle ich eine Klasse, “TwitterDataFlow.java”, in der ich alle erforderlichen Methoden implementieren werde.
Language Tool: Rechtschreibkorrektur
Anfänglich habe ich im POC festgestellt, dass die Ergebnisse der Sentiment-Analyse beeinträchtigt werden, wenn die Rechtschreibung in den Tweets fehlerhaft ist. Daher führe ich einen SpellChecker ein. Er wird uns helfen, die Rechtschreibung der Tweets zu korrigieren, bevor wir sie für die Sentiment-Analyse verwenden.
Dies werde ich tun, indem ich eine neue statische Methode namens ‘CorrectSpell’ erstelle. Darüber hinaus werde ich ein LanguageTool verwenden, um die Rechtschreibung zu überprüfen und zu korrigieren.
Laut LanguageTool’s GIT, “LanguageTool ist eine Open-Source-Korrektursoftware für Englisch, Französisch, Deutsch, Polnisch, Russisch und mehr als 20 weitere Sprachen. Sie findet viele Fehler, die eine einfache Rechtschreibprüfung nicht erkennen kann.”
Als Nächstes füge ich eine Abhängigkeit für das Language Tool in der pom.xml hinzu:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
Danach definiere ich eine statische Variable auf Klassenebene langTool der Klasse JLanguageTool. Dann initialisiere ich langTool mit einem Objekt der Klasse AmericanEnglish.
Als Nächstes kodiere ich die Methode namens SpellChecker mit der Eingabe als String text (normaler Text) und dem Rückgabetyp ebenfalls als String (Text mit korrekter Rechtschreibung). Ich verwende die check-Methode von JLanguageTool mit dem Parameter als ungeprüfter Text. Als Nächstes gibt sie eine Liste von RuleMatch zurück. Laut den JLanguageTool Java Docs bietet die Klasse RuleMatch “Informationen über eine Fehlerregel, die mit dem Text übereinstimmt, und die Position der Übereinstimmung.”
Anschließend definiere ich drei Variablen: ‘result’ vom Typ String, ‘lastPos’ vom Typ Integer, ‘tmp’ vom Typ String. Zudem erstelle ich bei jedem RuleMatch den Satz mit der ersten vorgeschlagenen Schreibweise des Tools neu. Damit habe ich die notwendigen Try-Catch-Blöcke überall dort hinzugefügt, wo sie erforderlich sind.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
Sentiment-Analyzer: Stanford CoreNLP
Der nächste Schritt bei der Sentiment-Analyse mit Spark besteht darin, Stimmungen aus dem Text zu ermitteln. Um dies zu tun, verwende ich die Core-NLP-Bibliothek von Stanford, um Sentiment-Werte zu finden. Anschließend erstelle ich eine Klasse namens ‘StanfordSentiment’, in der ich die Bibliothek implementieren werde, um die Stimmungen in unserem Text zu finden.
Um das zu tun, füge ich die folgenden Abhängigkeiten in der Datei pom.xml hinzu:
<!– Dies ist die Stanford Core NLP-Bibliothek –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– Dies ist die Modelldatei von Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Ich erstelle eine statische Objektvariable ‘props’, die Eigenschaften für die Pipeline von Stanford Core NLP definiert. Ich habe die minimalen Eigenschaften ausgewählt, um sie so leicht wie möglich zu machen. Danach setze ich die Annotatoren auf tokenize, ssplit, pos, parse, sentiment. Ich erstelle eine weitere statische Objektvariable ‘pipeline’ der Klasse StanfordCoreNLP. Schließlich initialisiere ich die Pipeline mit den ‘props’-Eigenschaften.
GetSentiment:
Ich habe eine Methode GetSentiment mit einem String als Eingabe und einem Double als Ausgabe erstellt. Ich verwende die Methode CorrectSpell, die ich in der Datei LanguageCheck.java erstellt habe. Die Methode CorrectSpell des LanguageCheck-Objekts gibt mir die korrekte Schreibweise des eingegebenen Tweets zurück. Ich verwende die Methode annotate von StanfordCoreNLP mit diesem korrigierten Text. Die von dieser Bibliothek ausgegebenen Sentiment-Werte sind:
0 => sehr negativ
1 => negativ
2 => neutral
3 => positiv
4 => sehr positiv
Ich subtrahiere das Ergebnis pro Satz um 2, um die folgenden neuen Sentiment-Kategorien zu erhalten:
-2 => sehr negativ
-1 => negativ
0 => neutral
1 => positiv
2 => sehr positiv
Ich gebe das Ergebnis des Sentiments jedes Tweets als Durchschnitt des Sentiments jedes Satzes des Tweets zurück. Tweets sind in keinem strukturierten Format verfasst. Daher kann ich keiner bestimmten Zeile eines Tweets ein höheres Gewicht als anderen zuweisen. Aus diesem Grund nehme ich an, dass jede Zeile in einem Tweet die gleiche Bedeutung hat. Ich gebe die Variable ‘total’ vom Typ Double zurück, die den resultierenden Sentiment-Wert des Tweets enthält.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
Paket com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Tweet-Text: " + checkedText); System.out.println("Sentiment-Wert: " + total); return total; } } |
Programmstart:
Nachdem diese beiden Klassen fertiggestellt sind, werden wir fortfahren, diese zu verwenden. Daher erstelle ich eine neue Klasse, “TwitterDataFlow.java”. Zuerst schreibe ich eine bedingte Prüfung, die das Programm nur ausführt, wenn die Anzahl der übergebenen Eingabeargumente genau 2 beträgt. Wenn die Anzahl der Argumente nicht gleich 2 ist, wird die Meldung zur fehlerhaften Verwendung ausgegeben und das Programm mit dem Exit-Status 1 beendet.
Ich erstelle eine SparkSession mit dem App-Namen Sentiment Analyzer. Ich setze die Eigenschaft der Hadoop-Konfiguration des Spark-Kontexts, “mapreduce input fileinputformat input dir recursive”, auf true. Dadurch kann ich Dateien rekursiv aus Ordnern abrufen. Ich erstelle eine Variable ‘inputPath’ der Klasse String, in der ich das Eingabeargument sowie ‘/*/*’ setze, wodurch ich die von Flume gespeicherten partitionierten Daten lesen kann. Ich lese die JSON-Daten von Flume in Dataset<Row> ‘data’.
Dann registriere ich eine UDF (User Defined Function) beim Spark SQL Context namens ‘Sentiment’, die einen String entgegennimmt, die GetSentiment-Methode von StanfordSentiment darauf anwendet und den Datentyp Double zurückgibt.
Derzeit habe ich Daten zu den Keywords Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft und Facebook von Flume. Daher erstelle ich eine Liste von Strings mit diesen Keywords. Für jedes dieser Unternehmen führe ich die folgenden Operationen aus.
Ablauf der Operationen:
Zuerst erstelle ich einen outPath, in dem ich die Ergebnisse speichern möchte. Ich erstelle eine temporäre Ansicht ‘complete’ über dem Datensatz ‘data’. Als Nächstes extrahiere ich Timestamp, partitionBy (um die Daten beim Speichern der Ergebnisse zu partitionieren), text, main_text (zur Verwendung für reguläre Ausdrücke) und followers aus den Daten.
Ich erstelle eine temporäre Ansicht über die Ergebnisse und filtere die Daten des jeweiligen Unternehmens heraus. Außerdem wende ich die Sentiment-UDF an, die mir die Sentiment-Werte in der Spalte ‘seVal’ zurückgibt. Ich persistiere die serialisierten Daten im Speicher und als Disk-Spill. Aus diesen Daten erhalte ich NetSentiment, das Produkt aus der Anzahl der Follower und dem Sentiment-Wert dieses Tweets. Dies hilft dabei, den Einfluss zu bestimmen, den dieser Tweet haben kann. Ich kann dafür verschiedene Formeln verwenden.
Wir persistieren die serialisierten Daten im Speicher und auf der Festplatte, da wir das gesamte Ergebnis speichern möchten, da die Sentiment-Analyse eine rechenintensive Aufgabe ist. Wenn wir dies nicht persistieren und planen, mehrere Formeln zur Berechnung des NetSentiment oder des Einflusses zu verwenden, würde in der vorherigen Abfrage, in der wir den Wert der Sentiment-Methode mehrmals verwenden würden, die Sentiment-Analyse des Tweets mehrmals durchgeführt werden.
Datengruppierung:
Nun gruppiere ich die Daten nach Zeitstempel und der partitionBy-Spalte und bilde den Durchschnitt des NetSentiment mit dieser Gruppierung. Dies gibt mir den durchschnittlichen Einfluss des Unternehmens, positiv oder negativ, in einer bestimmten Minute. Folglich schreibe ich die Ergebnisse für jedes Unternehmen in outPath und partitioniere sie nach der partitionBy-Spalte.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("Ungültige Anzahl an Argumenten! \n"); System.out.println( "Verwendung: Input Output \n " + "Input: Ort, von dem die partitionierten Daten gelesen werden müssen" + "Output: Wo das Endergebnis gespeichert werden soll"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 extrahiert TimeStamp, partitionBy (Date), Tweet-Text, Tweet-Text in Kleinschreibung // und followers_count des Users, der den Tweet verfasst hat Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Filtern von Tweets, die bestimmte Firmennamen enthalten Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // Erstellt eine View namens twitter tmp2.createOrReplaceTempView("twitter"); // tmp3 enthält alle ausgewählten Daten zusammen mit dem Sentiment-Wert der // Tweets Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Erstellen einer weiteren View tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Erstellen einer finalen View zum Speichern der Daten net.createOrReplaceTempView("final"); // Durchschnittsbildung der Sentiment-Werte pro Minute durch Gruppierung der Daten darauf query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Speichern des Ergebnisses im Result-Dataset result = spark.sql(query); // Schreiben des Ergebnisses auf die Festplatte result.write().partitionBy("partitionBy").json(outPath); } } } |
Außerdem exportiere ich nach Fertigstellung des Codes ein ausführbares Jar mit allen darin enthaltenen Abhängigkeiten und kopiere es auf den Server, auf dem ich diesen Job ausführen möchte. Als Nächstes kann ich es mit dem folgenden Befehl übermitteln:
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
wobei (Quelle):
--master: Die Master-URL für den Cluster (z. B.spark://23.195.26.187:7077)
--deploy-mode: Ob Ihr Driver auf den Worker-Nodes (cluster) oder lokal als externer Client (client) (Standard:client)application-jar: Pfad zu einem gebündelten Jar, das Ihre Anwendung und alle Abhängigkeiten enthält. Berücksichtigen Sie, dass die URL innerhalb Ihres Clusters global sichtbar sein muss, beispielsweise einhdfs://-Pfad oder einfile://-Pfad, der auf allen Nodes vorhanden ist.application-arguments: Argumente, die an die Main-Methode Ihrer Hauptklasse übergeben werden, falls vorhanden. Hier Eingabe- und Ausgabepfad
Letzte Schritte:
Dann erhalten wir die Ergebnisse der Sentiment-Analyse mit Spark aus dem Ausgabepfad. Dies ist beispielsweise ein mögliches Ergebnis für Apple:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
Ich habe diese Anwendung auf CloudSigma mit einem HDP-Cluster aus 5 Nodes bereitgestellt. Konkret hatte jeder Node die folgende Konfiguration:
256 GB SSD
16 GB RAM
20 GHz CPU
Alles in allem konnte ich die Ergebnisse der Sentiment-Analyse mit Spark in etwa 19 Stunden erhalten. Darüber hinaus habe ich fortgeschrittenere Berechnungen als das Programm über einen Datensatz von mehr als 80 GB durchgeführt.
Der Code ist zu finden auf GITHUB.
Kommentare
Noch keine Kommentare. Schreiben Sie den ersten.