Commit ef756d4b authored by Min Gao's avatar Min Gao
Browse files

hpc_vision

parent 96e0f05e
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
COMP90024 Cluster and Cloud Computing Assignment 2
Australian City Analytics
Team : Team 19
City :
Names :
Student IDs :
PURPOSE : import data to the couchdb database with suburb transfer
and emotion analyasis
VERSION : 1
DATE : 5/2017
"""
import couchdb
import hashlib
import json
import suburb_detect
import IGA_detect as IGA_d
import emotion_analysis as ea
import os
from mpi4py import MPI
# DB_URL = "http://127.0.0.1:5984/"
DB_URL = ""
# DB_TWITTER = "test"
DB_TWEET_DATABASE = "db_tweets"
DB_USER_DATABASE = "db_users"
DB_PLACE_DATABASE = "db_places"
TWITTER_LOCATION = "bigTwitter.json"
def get_database(db_name):
couch = couchdb.Server(DB_URL)
if db_name not in couch:
couch.create(db_name)
db = couch[db_name]
return db
def processTwitter(db_tweets, db_users, db_places, start, end):
count = 0
with open(TWITTER_LOCATION, "r", encoding="UTF-8") as twitterFile:
twitterFile.seek(start)
if start != 0:
line = twitterFile.readline()
while twitterFile.tell() < end:
line = twitterFile.readline()
try:
tweetInfoJson = json.dumps(json.loads(
line[:-1].strip(','))["json"])
user = json.loads(tweetInfoJson)["user"]
user["_id"] = md5(user["id_str"])
NewTweetInfo = json.loads(line[:-1].strip(','))["json"]
coordinate = json.loads(tweetInfoJson)["coordinates"]["coordinates"]
suburb = suburb_detect.which_suburb(coordinate)['name']
iga = IGA_d.detect_IGA(coordinate)
text = json.loads(tweetInfoJson)["text"]
sentiment = ea.sentiment_analyze_text(text)['sentiment']
try:
db_users.save(user)
except couchdb.ResourceConflict:
pass
NewTweetInfo["user"] = user["_id"]
if json.loads(tweetInfoJson)["place"] is not None:
place = json.loads(tweetInfoJson)["place"]
place["_id"] = md5(place["id"])
try:
db_places.save(place)
except couchdb.ResourceConflict:
pass
NewTweetInfo["place"] = place["_id"]
NewTweetInfo["suburb"] = suburb
NewTweetInfo["IGA"] = iga
NewTweetInfo["sentiment"] = sentiment
NewTweetInfo["_id"] = md5(json.loads(tweetInfoJson)["id_str"])
db_tweets.save(NewTweetInfo)
count += 1
print(count)
except:
continue
def md5(to_md5):
return hashlib.md5(str(to_md5).encode('ascii')).hexdigest().lower()
def main():
db_tweets = get_database(DB_TWEET_DATABASE)
db_users = get_database(DB_USER_DATABASE)
db_places = get_database(DB_PLACE_DATABASE)
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
processesNum = comm.Get_size()
twitterSize = os.path.getsize(TWITTER_LOCATION)
assignedJobSize = (twitterSize // processesNum)
# master process
if rank == 0:
for process in range(1, processesNum):
comm.send(assignedJobSize * process, dest = process)
processTwitter(db_tweets, db_users, db_places, 0, assignedJobSize)
print("_______________________ Final result _______________________")
print("finish")
# slave process
else:
startEle = comm.recv(source = 0)
endEle = startEle + assignedJobSize
processTwitter(db_tweets, db_users, db_places, startEle, endEle)
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment