0% found this document useful (0 votes)
35 views

CS 3308 Programming Assignment Unit 4

The document outlines a Python program for processing text documents to create an inverted index and calculate term frequencies, including TF-IDF values. It defines various functions for text normalization, tokenization, stop word removal, and database management using SQLite. The program also includes a main execution function that sets up the database, processes a corpus of documents, and reports statistics on the processed data.

Uploaded by

Reg
Copyright
© © All Rights Reserved
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
35 views

CS 3308 Programming Assignment Unit 4

The document outlines a Python program for processing text documents to create an inverted index and calculate term frequencies, including TF-IDF values. It defines various functions for text normalization, tokenization, stop word removal, and database management using SQLite. The program also includes a main execution function that sets up the database, processes a corpus of documents, and reports statistics on the processed data.

Uploaded by

Reg
Copyright
© © All Rights Reserved
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 7

import string

import sys
import os
import re
import math
import sqlite3
import time
from typing import Dict, Set, List
from collections import defaultdict, Counter
from nltk.stem import PorterStemmer

# Define stop words


stop_words = set(["a", "an", "the", "and", "or", "but", "is", "are", "was",
"were", "in", "of", "to", "with"])

# Compile regex patterns for efficiency


chars = re.compile(r'\W+')
pattid = re.compile(r'(\d{3})/(\d{3})/(\d{3})')

# Global counters for corpus statistics


tokens = 0
documents = 0
terms = 0
stop_word_count = 0

# Database to store term information


database: Dict[str, 'Term'] = {}

class Term:
"""
Class to represent term information in the index
Stores term frequency, document frequency, and posting information
"""

def __init__(self):
self.termid: int = 0
self.termfreq: int = 0
self.docs: int = 0
self.docids: Dict[int, int] = {}

def splitchars(line: str) -> List[str]:


"""
Split input text into tokens based on non-word characters
Args:
line: Input text string
Returns:
List of tokens
"""
return chars.split(line)

def remove_stop_words(tokens: List[str]) -> List[str]:


global stop_word_count
filtered_tokens = [token for token in tokens if token not in stop_words]
stop_word_count += len(tokens) - len(filtered_tokens)
return filtered_tokens

ps = PorterStemmer()

def stem_tokens(tokens: List[str]) -> List[str]:


return [ps.stem(token) for token in tokens]

def remove_punctuation_tokens(tokens: List[str]) -> List[str]:


return [token for token in tokens if token and token[0] not in
string.punctuation]

def parsetoken(line: str) -> List[str]:


"""
Process a line of text to extract and index terms
Args:
line: Input text line
Returns:
List of processed tokens
"""
global documents, tokens, terms

# Normalize input text


line = line.replace('\t', ' ').strip()

# Split into tokens


token_list = splitchars(line)
token_list = remove_stop_words(token_list)
token_list = stem_tokens(token_list)
token_list = remove_punctuation_tokens(token_list)

for token in token_list:


# Clean and normalize token
token = token.replace('\n', '')
lower_token = token.lower().strip()

if not lower_token: # Skip empty tokens


continue

tokens += 1 # Increment total token count

# Add new term to database if not exists


if lower_token not in database:
terms += 1
database[lower_token] = Term()
database[lower_token].termid = terms
database[lower_token].docids = {}
database[lower_token].docs = 0

# Update posting information


if documents not in database[lower_token].docids:
database[lower_token].docs += 1
database[lower_token].docids[documents] = 0
# Update term frequency
database[lower_token].docids[documents] += 1
database[lower_token].termfreq += 1

return token_list

def process(filename: str) -> bool:


"""
Process a single document file
Args:
filename: Path to document file
Returns:
Boolean indicating success
"""
try:
# print(f"Reading file: {filename}")
with open(filename, 'r', encoding='utf-8') as file:
for line in file:
parsetoken(line)
return True
except IOError as e:
print(f"Error processing file {filename}: {str(e)}")
return False
except UnicodeDecodeError:
print(f"Unicode decode error in file {filename}")
return False

def walkdir(cur: sqlite3.Cursor, dirname: str) -> bool:


"""
Recursively walk through directory and process all files
Args:
cur: Database cursor
dirname: Directory path
Returns:
Boolean indicating success
"""
global documents

try:
# Get all files and directories
all_items = [f for f in os.listdir(dirname)
if os.path.isdir(os.path.join(dirname, f))
or os.path.isfile(os.path.join(dirname, f))]

for item in all_items:


full_path = os.path.join(dirname, item)
if os.path.isdir(full_path):
print(f"Entering directory: {full_path}")
walkdir(cur, full_path)
else:
# print(f"Processing file: {full_path}")
documents += 1
# Add document to dictionary
cur.execute("INSERT INTO DocumentDictionary VALUES (?, ?)",
(full_path, documents))
if not process(full_path):
print(f"Failed to process file: {full_path}")
return True

except Exception as e:
print(f"Error walking directory {dirname}: {str(e)}")
return False

def setup_database(cursor: sqlite3.Cursor):


"""
Set up database tables and indexes
Args:
cursor: Database cursor
"""
# Document Dictionary
cursor.execute("DROP TABLE IF EXISTS DocumentDictionary")

# Term Dictionary
cursor.execute("DROP TABLE IF EXISTS TermDictionary")

# Posting Table
cursor.execute("DROP TABLE IF EXISTS Posting")

# Create new tables


cursor.execute("""
CREATE TABLE IF NOT EXISTS DocumentDictionary (
DocumentName TEXT,
DocId INTEGER PRIMARY KEY
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS TermDictionary (
Term TEXT,
TermId INTEGER PRIMARY KEY
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS Posting (
TermId INTEGER,
DocId INTEGER,
tfidf REAL,
docfreq INTEGER,
termfreq INTEGER,
FOREIGN KEY(TermId) REFERENCES TermDictionary(TermId),
FOREIGN KEY(DocId) REFERENCES DocumentDictionary(DocId)
)
""")

# Create indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_term ON
TermDictionary(Term)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_posting_term ON
Posting(TermId)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_posting_doc ON
Posting(DocId)")

def calculate_frequencies():
term_frequencies = []
document_frequencies = defaultdict(int)

for term, term_obj in database.items():


tf = Counter(term_obj.docids)
term_frequencies.append(tf)

for doc_id in tf:


document_frequencies[term] += 1

return term_frequencies, document_frequencies

def calculate_idf(document_frequencies, total_docs):


idf = {}
for term, df in document_frequencies.items():
idf[term] = math.log(total_docs / df)
return idf

def calculate_tf_idf(term_frequencies, idf):


tf_idf = []

for tf in term_frequencies:
tf_idf_doc = {}
for term, freq in tf.items():
tf_idf_doc[term] = freq * idf.get(term, 0)
tf_idf.append(tf_idf_doc)

return tf_idf

class InvertedIndex:
def __init__(self):
self.index = defaultdict(list)

def add_document(self, doc_id, tf_idf):


for term, weight in tf_idf.items():
self.index[term].append((doc_id, weight))

def report_statistics():
total_terms = sum(term.termfreq for term in database.values())
unique_terms = len(database)
print(f"Number of documents processed: {documents}")
print(f"Total number of terms parsed from all documents: {tokens}")
print(f"Total number of unique terms found and added to the index:
{unique_terms}")
print(f"Total number of terms found that matched one of the stop words:
{stop_word_count}")

def main():
"""
Main execution function
"""
# Record start time
start_time = time.localtime()
print(f"Start Time: {start_time.tm_hour:02d}:{start_time.tm_min:02d}")

# Initialize database
db_path = "cacm_index.db"
conn = sqlite3.connect(db_path)
conn.isolation_level = None # Enable autocommit
cursor = conn.cursor()

# Setup database tables


setup_database(cursor)

# Process corpus
corpus_path = "./cacm" # Update this path to match your environment
if not os.path.exists(corpus_path):
print(f"Error: Corpus directory not found at {corpus_path}")
return

walkdir(cursor, corpus_path)

# Calculate tf-idf for each term in each document


term_frequencies, document_frequencies = calculate_frequencies()
idf = calculate_idf(document_frequencies, documents)
tf_idf = calculate_tf_idf(term_frequencies, idf)

# Insert terms into database


for term, term_obj in database.items():
cursor.execute("INSERT INTO TermDictionary (Term, TermId) VALUES
(?, ?)",
(term, term_obj.termid))

# Insert posting information


for doc_id, freq in term_obj.docids.items():
tfidf = freq * math.log(documents / term_obj.docs)
cursor.execute("""
INSERT INTO Posting
(TermId, DocId, tfidf, docfreq, termfreq)
VALUES (?, ?, ?, ?, ?)
""", (term_obj.termid, doc_id, tfidf, term_obj.docs, freq))

# Commit changes and close connection


conn.commit()
conn.close()

# Print statistics
report_statistics()

end_time = time.localtime()
print(f"\nEnd Time: {end_time.tm_hour:02d}:{end_time.tm_min:02d}")

if __name__ == '__main__':
main()
statistics

You might also like