-
Notifications
You must be signed in to change notification settings - Fork 0
/
SentimentAnalyzer.py
46 lines (39 loc) · 1.6 KB
/
SentimentAnalyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import sys
import os
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.streaming.kafka import KafkaUtils
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from elasticsearch import Elasticsearch
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
es = Elasticsearch(['https://ElasticsearchURL:9200'])
#Predict the sentiment
def sentiment(text):
sentimentAnalyser = SentimentIntensityAnalyzer()
sentiment = sentimentAnalyser.polarity_scores(text)
if(sentiment["compound"] > 0):
return "positive"
elif(sentiment["compound"] < 0):
return "negative"
else:
return "neutral"
#Store the sentiment to elastic search with can be later visualized using Kibana
def getSentiment(time, rdd):
test = rdd.collect()
for i in test:
es.index(index="hash_tags_sentiment_analysis",
doc_type="tweet-sentiment-analysis", body=i)
#Using spark streaming to get the data from kafka and preform sentiment analysis.
if __name__ == "__main__":
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, 20)
brokers = 'localhost:9092'
topic = ["BLM"]
kvs = KafkaUtils.createDirectStream(
ssc, topic, {"metadata.broker.list": brokers})
tweets = kvs.map(lambda x: str(x[1].encode("ascii", "ignore"))).map(
lambda x: (x, sentiment(x), "#BLM")).map(lambda x: {"message": x[0], "sentiment": x[1], "hashTag": x[2]})
tweets.foreachRDD(getSentiment)
ssc.start()
ssc.awaitTermination()