0% found this document useful (0 votes)
38 views

CS 3308 Programming Assignment Unit 4

The document outlines a Python program for processing text documents to create an inverted index and calculate term frequencies, including TF-IDF values. It defines various functions for text normalization, tokenization, stop word removal, and database management using SQLite. The program also includes a main execution function that sets up the database, processes a corpus of documents, and reports statistics on the processed data.

Uploaded by

Reg
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
38 views

CS 3308 Programming Assignment Unit 4

The document outlines a Python program for processing text documents to create an inverted index and calculate term frequencies, including TF-IDF values. It defines various functions for text normalization, tokenization, stop word removal, and database management using SQLite. The program also includes a main execution function that sets up the database, processes a corpus of documents, and reports statistics on the processed data.

Uploaded by

Reg
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 7

import string

import sys
import os
import re
import math
import sqlite3
import time
from typing import Dict, Set, List
from collections import defaultdict, Counter
from nltk.stem import PorterStemmer

# Define stop words


stop_words = set(["a", "an", "the", "and", "or", "but", "is", "are", "was",
"were", "in", "of", "to", "with"])

# Compile regex patterns for efficiency


chars = re.compile(r'\W+')
pattid = re.compile(r'(\d{3})/(\d{3})/(\d{3})')

# Global counters for corpus statistics


tokens = 0
documents = 0
terms = 0
stop_word_count = 0

# Database to store term information


database: Dict[str, 'Term'] = {}

class Term:
"""
Class to represent term information in the index
Stores term frequency, document frequency, and posting information
"""

def __init__(self):
self.termid: int = 0
self.termfreq: int = 0
self.docs: int = 0
self.docids: Dict[int, int] = {}

def splitchars(line: str) -> List[str]:


"""
Split input text into tokens based on non-word characters
Args:
line: Input text string
Returns:
List of tokens
"""
return chars.split(line)

def remove_stop_words(tokens: List[str]) -> List[str]:


global stop_word_count
filtered_tokens = [token for token in tokens if token not in stop_words]
stop_word_count += len(tokens) - len(filtered_tokens)
return filtered_tokens

ps = PorterStemmer()

def stem_tokens(tokens: List[str]) -> List[str]:


return [ps.stem(token) for token in tokens]

def remove_punctuation_tokens(tokens: List[str]) -> List[str]:


return [token for token in tokens if token and token[0] not in
string.punctuation]

def parsetoken(line: str) -> List[str]:


"""
Process a line of text to extract and index terms
Args:
line: Input text line
Returns:
List of processed tokens
"""
global documents, tokens, terms

# Normalize input text


line = line.replace('\t', ' ').strip()

# Split into tokens


token_list = splitchars(line)
token_list = remove_stop_words(token_list)
token_list = stem_tokens(token_list)
token_list = remove_punctuation_tokens(token_list)

for token in token_list:


# Clean and normalize token
token = token.replace('\n', '')
lower_token = token.lower().strip()

if not lower_token: # Skip empty tokens


continue

tokens += 1 # Increment total token count

# Add new term to database if not exists


if lower_token not in database:
terms += 1
database[lower_token] = Term()
database[lower_token].termid = terms
database[lower_token].docids = {}
database[lower_token].docs = 0

# Update posting information


if documents not in database[lower_token].docids:
database[lower_token].docs += 1
database[lower_token].docids[documents] = 0
# Update term frequency
database[lower_token].docids[documents] += 1
database[lower_token].termfreq += 1

return token_list

def process(filename: str) -> bool:


"""
Process a single document file
Args:
filename: Path to document file
Returns:
Boolean indicating success
"""
try:
# print(f"Reading file: {filename}")
with open(filename, 'r', encoding='utf-8') as file:
for line in file:
parsetoken(line)
return True
except IOError as e:
print(f"Error processing file {filename}: {str(e)}")
return False
except UnicodeDecodeError:
print(f"Unicode decode error in file {filename}")
return False

def walkdir(cur: sqlite3.Cursor, dirname: str) -> bool:


"""
Recursively walk through directory and process all files
Args:
cur: Database cursor
dirname: Directory path
Returns:
Boolean indicating success
"""
global documents

try:
# Get all files and directories
all_items = [f for f in os.listdir(dirname)
if os.path.isdir(os.path.join(dirname, f))
or os.path.isfile(os.path.join(dirname, f))]

for item in all_items:


full_path = os.path.join(dirname, item)
if os.path.isdir(full_path):
print(f"Entering directory: {full_path}")
walkdir(cur, full_path)
else:
# print(f"Processing file: {full_path}")
documents += 1
# Add document to dictionary
cur.execute("INSERT INTO DocumentDictionary VALUES (?, ?)",
(full_path, documents))
if not process(full_path):
print(f"Failed to process file: {full_path}")
return True

except Exception as e:
print(f"Error walking directory {dirname}: {str(e)}")
return False

def setup_database(cursor: sqlite3.Cursor):


"""
Set up database tables and indexes
Args:
cursor: Database cursor
"""
# Document Dictionary
cursor.execute("DROP TABLE IF EXISTS DocumentDictionary")

# Term Dictionary
cursor.execute("DROP TABLE IF EXISTS TermDictionary")

# Posting Table
cursor.execute("DROP TABLE IF EXISTS Posting")

# Create new tables


cursor.execute("""
CREATE TABLE IF NOT EXISTS DocumentDictionary (
DocumentName TEXT,
DocId INTEGER PRIMARY KEY
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS TermDictionary (
Term TEXT,
TermId INTEGER PRIMARY KEY
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS Posting (
TermId INTEGER,
DocId INTEGER,
tfidf REAL,
docfreq INTEGER,
termfreq INTEGER,
FOREIGN KEY(TermId) REFERENCES TermDictionary(TermId),
FOREIGN KEY(DocId) REFERENCES DocumentDictionary(DocId)
)
""")

# Create indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_term ON
TermDictionary(Term)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_posting_term ON
Posting(TermId)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_posting_doc ON
Posting(DocId)")

def calculate_frequencies():
term_frequencies = []
document_frequencies = defaultdict(int)

for term, term_obj in database.items():


tf = Counter(term_obj.docids)
term_frequencies.append(tf)

for doc_id in tf:


document_frequencies[term] += 1

return term_frequencies, document_frequencies

def calculate_idf(document_frequencies, total_docs):


idf = {}
for term, df in document_frequencies.items():
idf[term] = math.log(total_docs / df)
return idf

def calculate_tf_idf(term_frequencies, idf):


tf_idf = []

for tf in term_frequencies:
tf_idf_doc = {}
for term, freq in tf.items():
tf_idf_doc[term] = freq * idf.get(term, 0)
tf_idf.append(tf_idf_doc)

return tf_idf

class InvertedIndex:
def __init__(self):
self.index = defaultdict(list)

def add_document(self, doc_id, tf_idf):


for term, weight in tf_idf.items():
self.index[term].append((doc_id, weight))

def report_statistics():
total_terms = sum(term.termfreq for term in database.values())
unique_terms = len(database)
print(f"Number of documents processed: {documents}")
print(f"Total number of terms parsed from all documents: {tokens}")
print(f"Total number of unique terms found and added to the index:
{unique_terms}")
print(f"Total number of terms found that matched one of the stop words:
{stop_word_count}")

def main():
"""
Main execution function
"""
# Record start time
start_time = time.localtime()
print(f"Start Time: {start_time.tm_hour:02d}:{start_time.tm_min:02d}")

# Initialize database
db_path = "cacm_index.db"
conn = sqlite3.connect(db_path)
conn.isolation_level = None # Enable autocommit
cursor = conn.cursor()

# Setup database tables


setup_database(cursor)

# Process corpus
corpus_path = "./cacm" # Update this path to match your environment
if not os.path.exists(corpus_path):
print(f"Error: Corpus directory not found at {corpus_path}")
return

walkdir(cursor, corpus_path)

# Calculate tf-idf for each term in each document


term_frequencies, document_frequencies = calculate_frequencies()
idf = calculate_idf(document_frequencies, documents)
tf_idf = calculate_tf_idf(term_frequencies, idf)

# Insert terms into database


for term, term_obj in database.items():
cursor.execute("INSERT INTO TermDictionary (Term, TermId) VALUES
(?, ?)",
(term, term_obj.termid))

# Insert posting information


for doc_id, freq in term_obj.docids.items():
tfidf = freq * math.log(documents / term_obj.docs)
cursor.execute("""
INSERT INTO Posting
(TermId, DocId, tfidf, docfreq, termfreq)
VALUES (?, ?, ?, ?, ?)
""", (term_obj.termid, doc_id, tfidf, term_obj.docs, freq))

# Commit changes and close connection


conn.commit()
conn.close()

# Print statistics
report_statistics()

end_time = time.localtime()
print(f"\nEnd Time: {end_time.tm_hour:02d}:{end_time.tm_min:02d}")

if __name__ == '__main__':
main()
statistics

You might also like