Jnsa

Download as pdf or txt
Download as pdf or txt
You are on page 1of 34

DISTRIBUTED SYSTEMS (CO433)

LAB MANUAL

Subject Code: CO433


Subject Name: Distributed Systems
Year: 4th Year/7th Semester

Submitted by:
ANAND JHA
2K21/EE/42

Submitted to:
Mr. Preeti Yadav

Department of Computer Science Engineering


Delhi Technological University

Delhi Technological University


Shahbad Daulatpur, Main Bawana Road, Delhi-110042
November, 2024

1
INDEX

S.NO EXPERIMENT DATE SIGN

1) Write a program to implement concurrent day-time


client-server application

2) Write a program to implement a Lamport Logical


clock in a Distributed system

3) Write a program to implement concept of Mutual


Exclusion using centralized algorithm

4) Write a program to implement Bully Election


algorithm in Distributed systems

5) Write a program to implement Bi-directional Ring


election algorithm

6) Write a program to implement 2-Phase commit


protocol

7) Write a program to implement 3-Phase commit


protocol

2
EXPERIMENT-1

Aim:
Implement concurrent day-time client-server application.

Theory:

 This experiment demonstrates a basic client-server interaction using the UDP (User
Datagram Protocol) model. Unlike TCP, which establishes a connection before data transfer,
UDP is connectionless, allowing messages (datagrams) to be sent without prior setup. In
distributed systems, UDP is efficient for low-latency communication where reliability isn't
crucial, as it lacks built-in error correction or message acknowledgment.

 Socket Concept: A socket is a communication endpoint defined by an IP address and port


number. It allows inter-process communication over a network. The application uses
socket(), bind(), listen(), and accept() functions to establish connections, accept
incoming client requests, and manage responses.

 Process Flow:

 Server binds to a specified address, listens for incoming requests, and responds with
date and time.
 Clients send a request and receive a timestamped message from the server, illustrating
asynchronous communication typical of UDP.

CODE:
File: server.c

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
int main()
{
struct sockaddr_in sa; // Socket address data structure
int sockfd, coontfd; // Source and destination addresses
char str[1025]; // Buffer to hold the out-going stream
time_t tick; // System time data structure
sockfd = socket(AF_INET, SOCK_STREAM, 0); // New socket

// Checking for valid socket


if (sockfd < 0)

3
{
printf("Error in creating socket\n");
exit(0);
}
else
{
printf("Socket Created\n");
}
// Clearing and assigning type and address to the socket
printf("Socket created\n");
bzero(&sa, sizeof(sa));
memset(str, '0', sizeof(str)); // clearing the buffer
sa.sin_family = AF_INET;
sa.sin_port = htons(5600);
sa.sin_addr.s_addr = htonl(INADDR_ANY);
// binding and verifying the socket to address
if (bind(sockfd, (struct sockaddr *)&sa, sizeof(sa)) < 0)
{
printf("Bind Error\n");
}
else
printf("Binded\n");
// starts the server with a max client queue size set as 10
listen(sockfd, 10);
// server run
while (1)
{
coontfd = accept(sockfd, (struct sockaddr *)NULL, NULL); // Accept a request from client
printf("Accepted\n");
tick = time(NULL);
snprintf(str, sizeof(str), "%.24s\r\n", ctime(&tick)); //read sys time and write to buffer
printf("sent\n");
printf("%s\n", str);
write(coontfd, str, strlen(str)); // send buffer to client
}
close(sockfd); // close the socket
return 0;
}

File: client.c
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <arpa/inet.h>
int main()
{
struct sockaddr_in sa; // Socket address data structure
int n, sockfd; // read and source
char buff[1025]; // buffer to store the read stream

sockfd = socket(PF_INET, SOCK_STREAM, 0); // New socket created


// Checking for valid socket

4
if (sockfd < 0)
{
printf("Error in creation\n");
exit(0);
}
else
printf("Socket created\n");
// Clearing and assigning type and address to the socket
bzero(&sa, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = htons(5600);
// establishing and verifying the connection
if (connect(sockfd, (struct sockaddr_in *)&sa, sizeof(sa)) <
0)
{
printf("Connection failed\n");
exit(0);
}
else
printf("Connection made\n");
// Reading and priting data from the server after verification
if (n = read(sockfd, buff, sizeof(buff)) < 0)
{
printf("Read Error\n");
exit(0);
}
else
{
printf("Read message: %s\n", buff);
printf("%s\n", buff);
printf("Done with connection, exiting\n");
}
close(sockfd); // Closing the socket
return 0;
}

OUTPUT:

Server

Client

5
EXPERIMENT-2

Aim:
Write a program to implement a Lamport Logical clock in a Distributed system

Theory:
 Lamport clocks are a logical mechanism for ordering events in distributed systems without
synchronized physical clocks. This system provides a partial ordering of events based on
message exchanges and is crucial in distributed transactions where events need a consistent
timeline.
 Algorithm:

 Increment on Event: Each process increments its counter on internal events.


 Message Sending: When a message is sent, the sender’s timestamp is attached.
 Message Receiving: On receipt, the receiver sets its clock to the maximum of its
current time and the received timestamp, then increments it.

 Usage: This logical ordering is used in distributed systems requiring consistency, such as
databases or distributed file systems, where conflicts may arise if operations aren’t correctly
ordered.

CODE:
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>

#define TRUE 1
#define FALSE 0
#define ML 1024
#define MPROC 32

typedef struct lamport_clock


{
int timer;
} lamport_clock;

void init(lamport_clock *clk)


{
clk->timer = 0;
}

void tick(lamport_clock *clk, int phase)


{
clk->timer += phase;

6
}

int str_to_int(char str[ML], int n)


{
int x = 0;
for (int i = 0; i < n; i++)
{
int k = str[i] - '0';
x = x * 10 + k;
}
return x;
}

void update_clock(lamport_clock *clk, int new_time)


{
clk->timer = new_time;
}

int connect_to_port(int connect_to)


{
int sock_id;
int opt = 1;
struct sockaddr_in server;

if ((sock_id = socket(AF_INET, SOCK_DGRAM, 0)) < 0)


{
perror("unable to create a socket");
exit(EXIT_FAILURE);
}

setsockopt(sock_id, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt, sizeof(int));


memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(connect_to);

if (bind(sock_id, (const struct sockaddr *)&server, sizeof(server)) < 0)


{
perror("unable to bind to port");
exit(EXIT_FAILURE);
}

return sock_id;
}

void send_to_id(int to, int id, lamport_clock clk)


{
struct sockaddr_in cl;
memset(&cl, 0, sizeof(cl));
char message[ML];
sprintf(message, "%d", clk.timer);

cl.sin_family = AF_INET;
cl.sin_addr.s_addr = inet_addr("127.0.0.1"); // Use localhost or set appropriate IP
cl.sin_port = htons(to);

sendto(id, (const char *)message, strlen(message), 0, (const struct sockaddr *)&cl, sizeof(cl));
}

int main(int argc, char *argv[])

7
{
int self = atoi(argv[1]);
int n_proc = atoi(argv[2]);
int phase = atoi(argv[3]);
int procs[MPROC];
int sock_id;
int new_time;
int itr, len, n, start_at;
char buff[ML];
struct sockaddr_in from;
lamport_clock self_clock;

for (itr = 0; itr < n_proc; itr++)


procs[itr] = atoi(argv[4 + itr]);

start_at = atoi(argv[4 + n_proc]) == 1 ? TRUE : FALSE;


init(&self_clock);
tick(&self_clock, phase);

printf("Creating a node at port %d with initiator flag %d\n", self, start_at);


sock_id = connect_to_port(self);

if (start_at == TRUE)
{
printf("Proc %d is starting comms\n", self);
for (itr = 0; itr < n_proc; itr++)
{
printf("Sending to proc: %d\n", procs[itr]);
send_to_id(procs[itr], sock_id, self_clock);
}
}

while (TRUE)
{
printf("\t -------------------------------------------- \n\n");
sleep(1);
tick(&self_clock, phase);

len = sizeof(from);
n = recvfrom(sock_id, (char *)buff, ML, MSG_WAITALL, (struct sockaddr *)&from, &len);

if (n > 0)
{
buff[n] = '\0';
printf("Received time: %s, Self time: %d\n", buff, self_clock.timer);
new_time = atoi(buff);

if (new_time > self_clock.timer)


{
printf("New time > Current time: synchronizing clocks\n");
printf("Current time: %d, Updated time: %d\n", self_clock.timer, new_time + 1);
update_clock(&self_clock, new_time + 1);
}
else
{
printf("No need to synchronize times\n");
}
}

for (itr = 0; itr < n_proc; itr++)

8
{
printf("Sending time %d to proc %d\n", self_clock.timer, procs[itr]);
send_to_id(procs[itr], sock_id, self_clock);
}
printf("\t -------------------------------------------- \n\n");
}
}

OUTPUT:
Process 1 (Port 5001) Process 2 (Port 5002)

9
Experiment-3
AIM:
Write a program to implement concept of Mutual Exclusion using centralized algorithm.

Thoery:
 Mutual exclusion ensures that critical resources in distributed systems are accessed by one
process at a time, avoiding conflicts. The centralized algorithm elects one process as a
coordinator to handle resource requests, manage access control, and prevent race conditions.
 Process:

 A process requests access from the coordinator. If no other process is in the critical
section, access is granted; otherwise, the request is queued.
 After a process completes, it releases control, allowing the next request in the queue
to proceed.

 Advantages: Guarantees mutual exclusion and fairness, and ensures no process is left
waiting indefinitely (no starvation).

 Drawbacks: The coordinator becomes a single point of failure, which can halt the system if
it crashes, and may also create bottlenecks in high-load scenarios.

CODE:

File: client.c

#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <arpa/inet.h>
#include <unistd.h>
typedef struct resources
{
int A;
char B;
int C;
char D;
} resources;
int main()
{
struct sockaddr_in sa; // Socket address data structure
resources R;
int n, sockfd; // read and source
char buff[1025], obuff[256]; // buffer to store the read stream

10
int snded, rec;
sockfd = socket(PF_INET, SOCK_STREAM, 0); // New socketcreated
// Checking for valid socket
if (sockfd < 0)
{
printf("Error in creation\n");
exit(0);
}
else
printf("Socket created\n");
// Clearing and assigning type and address to the socket
bzero(&sa, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = htons(8888);
// establishing and verifying the connection
if (connect(sockfd, (struct sockaddr_in *)&sa, sizeof(sa)) <
0)
{
printf("Connection failed\n");
exit(0);
}
else
printf("Connection made\n");
while (1)
{
snded = write(sockfd, "PING", 5);
if (snded > -1)
printf("SENT PING\n");
rec = read(sockfd, obuff, 256);
obuff[rec] = '\0';
if (strcmp(obuff, "PONG") == 0)
{
usleep(750);
FILE *f;
f = fopen("shared_mem.txt", "r");
fread(&R, sizeof(R), 1, f);
fclose(f);
printf("read %d, %d, %d, %d from server\n", R.A, R.B,
R.C, R.D);
R.A += 1;
R.B += 1;
R.C += 1;
R.D += 1;
f = fopen("shared_mem.txt", "w");
fwrite(&R, sizeof(R), 1, f);
fclose(f);
printf("Got access to CS\n");
snded = write(sockfd, "DONE", 4);
printf("Freeing Lock\n");
break;
}
}
// Reading and priting data from the server after verification
close(sockfd); // Closing the socket
return 0;
}

11
File: controller.c

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#define TRUE 1
#define FALSE 0

typedef struct resources


{
int A;
int B;
int C;
int D;
} resources;

int main()
{
resources R, temp;
R.A = 1;
R.B = 2;
R.C = 3;
R.D = 4;
FILE *fle;
fle = fopen("shared_mem.txt", "w");
fwrite(&R, sizeof(R), 1, fle);
fclose(fle);

struct sockaddr_in sa; // Socket address data structure


int opt = TRUE, addrlen;
int sockfd, clients[50]; // Source and destination addresses
char buff[256]; // Buffer to hold the out-going stream
int rec, i, sd, activity, new_sock, sended;
int max_sd;
int flag = 0;
sockfd = socket(AF_INET, SOCK_STREAM, 0); // New socket created

// Checking for valid socket


memset(clients, 0, sizeof(clients));

fd_set readfds;
if (sockfd < 0)
{
printf("Error in creating socket\n");
exit(0);
}
else
{
printf("Socket Created\n");
}
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)) < 0)
{
printf("error\n");

12
}

// Clearing and assigning type and address to the socket


printf("Socket created\n");
bzero(&sa, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = htons(8888);
sa.sin_addr.s_addr = htonl(INADDR_ANY);

// binding and verifying the socket to address


if (bind(sockfd, (struct sockaddr *)&sa, sizeof(sa)) < 0)
{
printf("Bind Error\n");
}
else
printf("Binded\n");

// starts the server with a max client queue size set as 10


listen(sockfd, 10);
addrlen = sizeof(sa);

// server run
while (TRUE)
{
// Clearing socket set
FD_ZERO(&readfds);

FD_SET(sockfd, &readfds);
max_sd = sockfd;
for (i = 0; i < 50; i++)
{
sd = clients[i];
if (sd > 0)
FD_SET(sd, &readfds);
if (sd > max_sd)
max_sd = sd;
}
activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
if (activity < 0)
printf("Select error\n");
if (FD_ISSET(sockfd, &readfds))
{
if ((new_sock = accept(sockfd, (struct sockaddr *)NULL, NULL)) < 0)
perror("accept");
else
{
printf("New connection, sock fd %d\n", new_sock);
}

sended = send(new_sock, buff, strlen(buff), 0);


if (sended < 0)
perror("Send");
for (i = 0; i < 50; i++)
{
if (clients[i] == 0)
{
clients[i] = new_sock;
break;
}
}

13
}
for (i = 0; i < 50; i++)
{
sd = clients[i];
if (FD_ISSET(sd, &readfds))
{
FILE *fle;
fle = fopen("shared_mem.txt", "r");
fread(&temp, sizeof(temp), 1, fle);
fclose(fle);
rec = read(sd, buff, 256);
if (rec == 0)
{
getpeername(sd, (struct sockaddr *)&sa, (socklen_t *)&sa);
printf("%d has disconnected unexpectedly with ip %s and port %d\n", sd, inet_ntoa(sa.sin_addr),
ntohs(sa.sin_port));
printf("recovering data\n");
FILE *fle;
fle = fopen("shared_mem.txt", "w+");
fwrite(&temp, sizeof(temp), 1, fle);
fclose(fle);
close(sd);
clients[i] = 0;
}
else
{
buff[rec] = '\0';
printf("received %s from %d\n", buff, sd);

if (strcmp(buff, "PING") == 0 && flag == 1)


{
printf("Read buffer = %s, from %d and send NACK\n", buff, sd);
sended = write(sd, "NACK", 4);
}
else if (strcmp(buff, "PING") == 0 && flag == 0)
{
printf("Read Buffer = %s, from %d\n", buff, sd);
flag = 1;
sended = write(sd, "PONG", 4);
}
else if (strcmp(buff, "DONE") == 0)
{
printf("Lock freed\n");
flag = 0;
FILE *fle;
fle = fopen("shared_mem.txt", "r");
fread(&temp, sizeof(temp), 1, fle);
printf("Read %d, %d, %d, %d from %d\n", temp.A, temp.B, temp.C, temp.D, sd);
fclose(fle);
clients[i] = 0;
close(sd);
break;
}
}
}
}
}
close(sockfd); // close the socket
return 0;
}

14
OUTPUT:

15
EXPERIMENT-4

AIM:
Write a program to implement Bully Election algorithm in Distributed systems

Theory:
This algorithm handles coordinator selection within distributed systems. When a coordinator
process fails, the system initiates an election to select a new one with the highest priority.
 Steps:

 A process detects coordinator failure and initiates an election by sending a message to


all higher-priority processes.
 If no higher-priority process responds, the initiating process assumes the role of
coordinator.
 If a higher-priority process exists and responds, it continues the election process until
the highest-priority process wins.

 Advantages: Ensures that the highest-priority node assumes control, making it suitable for
leader election in systems with defined hierarchy (e.g., banking or finance systems with
prioritized nodes).
 Drawbacks: Requires multiple message exchanges, and the process can be prolonged if
several nodes are involved.

CODE:
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#define MSG_CONFIRM 0
#define TRUE 1
#define FALSE 0
#define ML 1024
#define MPROC 32

/* Function to create a new connection to port 'connect_to' */


int connect_to_port(int connect_to)
{
int sock_id;
int opt = 1;
struct sockaddr_in server;

if ((sock_id = socket(AF_INET, SOCK_DGRAM, 0)) < 0)

16
{
perror("unable to create a socket");
exit(EXIT_FAILURE);
}

setsockopt(sock_id, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt, sizeof(int));


memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(connect_to);

if (bind(sock_id, (const struct sockaddr *)&server, sizeof(server)) < 0)


{
perror("unable to bind to port");
exit(EXIT_FAILURE);
}
return sock_id;
}

/* Sends a message to port id */


void send_to_id(int to, int id, char message[ML])
{
struct sockaddr_in cl;
memset(&cl, 0, sizeof(cl));

cl.sin_family = AF_INET;
cl.sin_addr.s_addr = INADDR_ANY;
cl.sin_port = htons(to);

sendto(id, (const char *)message, strlen(message), MSG_CONFIRM, (const struct sockaddr *)&cl,
sizeof(cl));
}

/* Starts the election, returns 1 if it wins the round */


int election(int id, int *procs, int num_procs, int self)
{
int itr;
char message[ML];
strcpy(message, "ELECTION");
int is_new_coord = 1; // assume you are the winner until you lose

for (itr = 0; itr < num_procs; itr += 1)


{
if (procs[itr] > self)
{
printf("sending election to: %d\n", procs[itr]);
send_to_id(procs[itr], id, message);
is_new_coord = 0; // a proc with id > self exists thus cannot be coord
}
}
return is_new_coord;
}

/* Announces completion by sending coordinator messages */


void announce_completion(int id, int *procs, int num_procs, int self)
{
int itr;
char message[ML];
strcpy(message, "COORDINATOR");

17
for (itr = 0; itr < num_procs; itr += 1)
if (procs[itr] != self)
send_to_id(procs[itr], id, message);
}

int main(int argc, char *argv[])


{
if (argc < 4)
{
fprintf(stderr, "Usage: %s <self> <n_proc> <proc1> <proc2> ... <start_at>\n", argv[0]);
exit(EXIT_FAILURE);
}

int self = atoi(argv[1]);


int n_proc = atoi(argv[2]);
int procs[MPROC];
int sock_id, bully_id;
int itr, len, n, start_at;
char buff[ML], message[ML];
struct sockaddr_in from;

if (n_proc > MPROC)


{
fprintf(stderr, "Error: n_proc exceeds maximum number of processes (%d)\n", MPROC);
exit(EXIT_FAILURE);
}

for (itr = 0; itr < n_proc; itr += 1)


procs[itr] = atoi(argv[3 + itr]);

start_at = atoi(argv[3 + n_proc]) == 1 ? TRUE : FALSE;

// 1. Create socket
printf("creating a node at %d %d \n", self, start_at);
sock_id = connect_to_port(self);

// 2. Check if process is initiator


if (start_at == TRUE)
{
if (election(sock_id, procs, n_proc, self))
{
announce_completion(sock_id, procs, n_proc, self);
printf("ANNOUNCING SELF AS NEW COORDINATOR\n");
}
}

// 3. If not the initiator, wait for someone else


while (TRUE)
{
len = sizeof(from); // Initialize len before calling recvfrom
memset(&from, 0, sizeof(from));
n = recvfrom(sock_id, (char *)buff, ML, MSG_WAITALL, (struct sockaddr *)&from, &len);
if (n < 0)
{
perror("recvfrom failed");
continue;
}
buff[n] = '\0';
printf("Received message: %s\n", buff);

18
if (!strcmp(buff, "ELECTION"))
{
strcpy(message, "E-ACK"); // send election acknowledgment
sendto(sock_id, (const char *)message, strlen(message), MSG_CONFIRM, (const struct sockaddr
*)&from, sizeof(from));

if (election(sock_id, procs, n_proc, self))


{
announce_completion(sock_id, procs, n_proc, self);
printf("ANNOUNCING SELF AS NEW COORDINATOR\n");
}
}
else if (!strcmp(buff, "E-ACK"))
continue; // nothing to do, your job is done
else if (!strcmp(buff, "COORDINATOR"))
bully_id = ntohs(from.sin_port); // Correctly set bully_id from sender's port
}
}

OUTPUT:

19
EXPERIMENT-5

AIM:
Write a program to implement Ring election algorithm.

Theory:
 In the ring election algorithm, processes are arranged in a logical or physical ring. Each
process knows its successor, allowing election messages to circulate around the ring until a
new coordinator is chosen.
 Process:

 The initiator sends an ELECTION message containing its ID to its successor.


 Each process appends its ID to the message and forwards it around the ring until it
returns to the initiator.
 The highest ID in the message becomes the new coordinator.

 Advantages: Provides a fault-tolerant approach with minimal communication, as messages


are only forwarded to successors.
 Drawbacks: Ring structures have latency issues, as a single failed process can disrupt the
ring, making message passing slower than direct communication.

Code:
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#define MSG_CONFIRM 0
#define TRUE 1
#define FALSE 0
#define ML 1024
#define MPROC 32

typedef struct lamport_clock


{
int timer;
} lamport_clock;

void init(lamport_clock *clk)


{
clk->timer = 0;
}

void tick(lamport_clock *clk, int phase)


{
clk->timer += phase;

20
}

int str_to_int(char str[ML], int n)


{
int x = 0, i = 0, k;
printf("x: %d\n", x);
for (i = 0; i < n; i++)
{
k = atoi(str[i]);
x = x * 10 + k;
}
return x;
}

void update_clock(lamport_clock *clk, int new_time)


{
clk->timer = clk->timer + new_time;
}

int connect_to_port(int connect_to)


{
int sock_id;
int opt = 1;
struct sockaddr_in server;
if ((sock_id = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
{
perror("unable to create a socket");
exit(EXIT_FAILURE);
}
setsockopt(sock_id, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt, sizeof(int));
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(connect_to);
if (bind(sock_id, (const struct sockaddr *)&server, sizeof(server)) < 0)
{
perror("unable to bind to port");
exit(EXIT_FAILURE);
}
return sock_id;
}

void send_to_id(int to, int id, int diff)


{
struct sockaddr_in cl;
memset(&cl, 0, sizeof(cl));
char message[ML];
sprintf(message, "%d", diff);
cl.sin_family = AF_INET;
cl.sin_addr.s_addr = INADDR_ANY;
cl.sin_port = htons(to);
sendto(id, (const char *)message, strlen(message), MSG_CONFIRM, (const struct sockaddr *)&cl,
sizeof(cl));
}

void send_poll(int to, int id)


{
struct sockaddr_in cl;
memset(&cl, 0, sizeof(cl));
char message[ML];

21
sprintf(message, "%s", "POLL");
cl.sin_family = AF_INET;
cl.sin_addr.s_addr = INADDR_ANY;
cl.sin_port = htons(to);
sendto(id, (const char *)message, strlen(message), MSG_CONFIRM, (const struct sockaddr *)&cl,
sizeof(cl));
}

/* Announces completion by sending coordinator messages */


int main(int argc, char *argv[])
{
// 0. Initialize variables
int self = atoi(argv[1]);
int phase = atoi(argv[3]);
int server = atoi(argv[2]);
int times[MPROC];
int sock_id;
int avg = 0, diff = 0;
int new_time;
int itr, len, n, start_at;
char buff[ML], message[ML];
struct sockaddr_in from;
lamport_clock self_clock;
from.sin_family = AF_INET;
from.sin_addr.s_addr = htonl(INADDR_ANY);
init(&self_clock);
tick(&self_clock, phase);

// 1. Create socket
printf("creating a node at %d %d \n", self, start_at);
sock_id = connect_to_port(self);

// 3. If not the initiator, wait for someone else


while (TRUE)
{
printf("\t -------------------------------------------- \n\n");
sleep(2);
avg = 0;
tick(&self_clock, phase);
memset(&from, 0, sizeof(from));
n = recvfrom(sock_id, (char *)buff, ML, MSG_WAITALL, (struct sockaddr *)&from, &len);
buff[n] = '\0';

if (strcmp(buff, "POLL") == 0)
{
printf("Received Poll, Sending time to server\n");
send_to_id(server, sock_id, self_clock.timer);
printf("Time sent\n");
}
else
{
new_time = atoi(buff);
printf("Got clock corrections: %d, old time %d\n", new_time, self_clock.timer);
update_clock(&self_clock, new_time);
printf("Updated time, new time: %d\n", self_clock.timer);
exit(EXIT_SUCCESS);
}
printf("\t -------------------------------------------- \n\n");
}
}

22
OUTPUT:
Terminal 1 (Instance 1: self = 8001, server = 8002, phase = 1):

Terminal 2 (Instance 2: self = 8002, server = 8001, phase = 2):

Terminal 1 (Instance 1)

Terminal 2 (Instance 2)

23
EXPERIMENT-6

AIM:
Write a program to implement 2-Phase commit protocol
Theory:
The Two-Phase Commit Protocol is a distributed algorithm used to ensure all nodes in a
system agree to either commit or abort a transaction, achieving consistency across distributed
systems. It’s widely used in databases and distributed transactions.

The protocol operates in two main phases:

1. Prepare Phase:
o The coordinator (central controller) sends a "prepare" request to all
participating nodes (or "slaves") after confirming that each node has
completed its portion of the transaction locally.
o Each node responds with either "Ready" (indicating it is ready to commit) or
"Not Ready" (indicating it cannot commit, possibly due to an error or
conflict).
2. Commit/Abort Phase:
o If all nodes respond with "Ready," the coordinator sends a "Global Commit"
message to finalize the transaction across all nodes.
o If any node sends "Not Ready," the coordinator sends a "Global Abort"
message, instructing all nodes to abort the transaction.
o Each node then confirms the commit or abort with an acknowledgment back to
the coordinator.

This protocol is fault-tolerant, ensuring that even if one node fails or votes "Not Ready," no
partial commits occur, maintaining system integrity and preventing inconsistencies.

CODE:

File: server.c
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>

#define MSG_CONFIRM 0
#define TRUE 1
#define FALSE 0

24
#define ML 1024
#define MPROC 32

int connect_to_port(int connect_to)


{
int sock_id;
int opt = 1;
struct sockaddr_in server;

if ((sock_id = socket(AF_INET, SOCK_DGRAM, 0)) < 0)


{
perror("unable to create a socket");
exit(EXIT_FAILURE);
}

setsockopt(sock_id, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt, sizeof(int));


memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(connect_to);

if (bind(sock_id, (const struct sockaddr *)&server, sizeof(server)) < 0)


{
perror("unable to bind to port");
exit(EXIT_FAILURE);
}

return sock_id;
}

void send_to_id(int to, int from, char message[ML])


{
struct sockaddr_in cl;
memset(&cl, 0, sizeof(cl));
cl.sin_family = AF_INET;
cl.sin_addr.s_addr = INADDR_ANY;
cl.sin_port = htons(to);

sendto(from, (const char *)message, strlen(message), MSG_CONFIRM, (const struct sockaddr *)&cl,
sizeof(cl));
}

void begin_commit(int id, int *procs, int num_procs)


{
int itr;
char message[ML];
sprintf(message, "%s", "SCMT");
for (itr = 0; itr < num_procs; itr++)
{
printf("Sending begin commit to: %d\n", procs[itr]);
send_to_id(procs[itr], id, message);
}
}

void announce_action(int self, int *procs, int num_procs, char msg[ML])


{
int itr;
for (itr = 0; itr < num_procs; itr++)
{
send_to_id(procs[itr], self, msg);

25
}
}

int main(int argc, char *argv[])


{
int self = atoi(argv[1]);
int n_procs = atoi(argv[2]);
int procs[MPROC];
int sock_id, okcnt = 0, nocnt = 0, dncnt = 0;
int itr, len, n;
char buffer[ML];

struct sockaddr_in from;

for (itr = 0; itr < n_procs; itr += 1)


procs[itr] = atoi(argv[3 + itr]);

printf("Creating node at %d\n", self);


sock_id = connect_to_port(self);
begin_commit(sock_id, procs, n_procs);

while (TRUE)
{
sleep(2);
memset(&from, 0, sizeof(from));
n = recvfrom(sock_id, (char *)buffer, ML, MSG_WAITALL, (struct sockaddr *)&from, &len);
buffer[n] = '\0';
printf("Received: %s\n", buffer);

if (strcmp(buffer, "CMOK") == 0)
{
okcnt += 1;
}
else if (strcmp(buffer, "CMNO") == 0)
{
nocnt += 1;
}

if ((nocnt + okcnt) == n_procs)


{
printf("Received replies from all clients\n");
if (okcnt == n_procs)
{
printf("Announcing complete commit\n");
announce_action(sock_id, procs, n_procs, "CDON");
}
else
{
printf("Announcing abort commit\n");
announce_action(sock_id, procs, n_procs, "CABT");
}
}

if (strcmp(buffer, "DONE") == 0)
{
dncnt += 1;
printf("Clients confirmed commit\n");
if (dncnt == n_procs)
{
printf("All process announced commit action\n");

26
exit(EXIT_SUCCESS);
}
}
}
return 0;
}

File: client.c
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>

#define MSG_CONFIRM 0
#define TRUE 1
#define FALSE 0
#define ML 1024
#define MPROC 32

int connect_to_port(int connect_to)


{
int sock_id;
int opt = 1;
struct sockaddr_in server;

if ((sock_id = socket(AF_INET, SOCK_DGRAM, 0)) < 0)


{
perror("unable to create a socket");
exit(EXIT_FAILURE);
}

setsockopt(sock_id, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt, sizeof(int));


memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(connect_to);

if (bind(sock_id, (const struct sockaddr *)&server, sizeof(server)) < 0)


{
perror("unable to bind to port");
exit(EXIT_FAILURE);
}

return sock_id;
}

void send_to_id(int to, int from, char message[ML])


{
struct sockaddr_in cl;
memset(&cl, 0, sizeof(cl));
cl.sin_family = AF_INET;
cl.sin_addr.s_addr = INADDR_ANY;

27
cl.sin_port = htons(to);

sendto(from, (const char *)message, strlen(message), MSG_CONFIRM, (const struct sockaddr *)&cl,
sizeof(cl));
}

int main(int argc, char *argv[])


{
int self = atoi(argv[1]);
int server = atoi(argv[2]);
char *action = argv[3];
int sock_id, n;
char buffer[ML];
struct sockaddr_in from;
socklen_t len; // Declare len as socklen_t

printf("Creating node at %d\n", self);


sock_id = connect_to_port(self);

while (TRUE)
{
sleep(2);
memset(&from, 0, sizeof(from));
n = recvfrom(sock_id, (char *)buffer, ML, MSG_WAITALL, (struct sockaddr *)&from, &len);
buffer[n] = '\0';
printf("Received: %s\n", buffer);

if (strcmp(buffer, "SCMT") == 0)
{
printf("Sending %s to server\n", action);
send_to_id(server, sock_id, action);
}
else if (strcmp(buffer, "CDON") == 0)
{
printf("Got complete commit, committing to logs\n");
send_to_id(server, sock_id, "DONE");
exit(EXIT_SUCCESS);
}
else if (strcmp(buffer, "CABT") == 0)
{
printf("Got abort commit, deleting updates\n");
send_to_id(server, sock_id, "DONE");
exit(EXIT_FAILURE);
}
}
return 0;
}

28
OUTPUT:

>Coordinator Output

>Client Output:

Client 1 (on port 9001):

Client 2 (on port 9002):

29
EXPERIMENT-7
AIM:
Write a program to implement 3-Phase commit protocol

Theory:
The 3-Phase Commit Protocol is a distributed transaction protocol designed to ensure that
all participants in a transaction either commit or abort the transaction in a coordinated
manner. It is an extension of the well-known 2-Phase Commit (2PC) protocol, with an
additional phase that reduces the chances of participants getting blocked due to failures.

Key Phases:

1. Pre-Commit Phase:
o The coordinator sends a PREPARE message to all participants, asking them if
they are ready to commit the transaction.
o Participants perform local checks (such as locking resources) and respond with
either an ACK (if ready) or NO (if not ready).
2. Commit Phase:
o If the coordinator receives ACK messages from all participants, it sends a
COMMIT message to all participants, signaling that the transaction can be
committed.
o Participants respond with a COMMIT_ACK to confirm that they have committed
the transaction.
3. Final Commit Phase:
o If any participant fails to acknowledge or there is a failure in communication,
the coordinator sends an ABORT message to all participants, instructing them to
abort the transaction.
o This phase ensures that in case of failure, the system can recover without
leaving transactions in an uncertain state.

Advantages of 3-Phase Commit:

 Non-blocking Recovery: The additional phase ensures that if a participant or


coordinator fails during the protocol, they can recover without being stuck in an
uncertain state.
 Improved Failure Handling: The 3-Phase Commit protocol introduces more
resilience to network and system failures compared to the 2-Phase Commit protocol.

Disadvantages:

 Complexity: The protocol adds an extra phase, which can increase the complexity of
implementation and message traffic.
 Latency: The added communication can result in higher latency for transactions.

30
CODE:
File: coordinator.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>

#define PORT 8080


#define MAX_PARTICIPANTS 3
#define BUFFER_SIZE 1024

void handle_error(const char *msg)


{
perror(msg);
exit(1);
}

int main()
{
int sockfd, client_sock, len;
struct sockaddr_in server_addr, client_addr;
char buffer[BUFFER_SIZE];
socklen_t addr_len = sizeof(client_addr);

// Step 1: Create a socket


if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
{
handle_error("Socket creation failed");
}

server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

// Step 2: Bind the socket


if (bind(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
{
handle_error("Bind failed");
}

printf("Coordinator waiting for requests...\n");

// Step 3: Wait for "prepare" messages from participants


for (int i = 0; i < MAX_PARTICIPANTS; i++)
{
recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&client_addr, &addr_len);
printf("Received Prepare from participant %d\n", i + 1);
sendto(sockfd, "ACK", 3, 0, (struct sockaddr *)&client_addr, addr_len); // Send
acknowledgment
}

31
printf("Received all prepares. Sending commit requests...\n");

// Step 4: Send "commit" to all participants


for (int i = 0; i < MAX_PARTICIPANTS; i++)
{
sendto(sockfd, "COMMIT", 6, 0, (struct sockaddr *)&client_addr, addr_len);
}

// Step 5: Wait for "commit" responses from all participants


for (int i = 0; i < MAX_PARTICIPANTS; i++)
{
recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&client_addr, &addr_len);
if (strcmp(buffer, "COMMIT_ACK") == 0)
{
printf("Participant %d committed.\n", i + 1);
}
}

printf("Transaction committed successfully.\n");


close(sockfd);
return 0;
}

File: participant.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>

#define COORDINATOR_IP "127.0.0.1"


#define PORT 8080
#define BUFFER_SIZE 1024

void handle_error(const char *msg)


{
perror(msg);
exit(1);
}

int main()
{
int sockfd;
struct sockaddr_in server_addr;
char buffer[BUFFER_SIZE];
socklen_t addr_len = sizeof(server_addr);

// Step 1: Create a socket


if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
{
handle_error("Socket creation failed");
}

32
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
server_addr.sin_addr.s_addr = inet_addr(COORDINATOR_IP);

// Step 2: Send "prepare" to the coordinator


sendto(sockfd, "PREPARE", 7, 0, (struct sockaddr *)&server_addr, addr_len);

// Step 3: Wait for coordinator's acknowledgment


recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&server_addr, &addr_len);
if (strcmp(buffer, "ACK") == 0)
{
printf("Coordinator prepared. Ready to commit.\n");

// Step 4: Wait for commit from the coordinator


recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&server_addr, &addr_len);
if (strcmp(buffer, "COMMIT") == 0)
{
printf("Committing transaction.\n");
sendto(sockfd, "COMMIT_ACK", 10, 0, (struct sockaddr *)&server_addr, addr_len);
}
}

close(sockfd);
return 0;
}

OUTPUT:

>Coordinator Terminal:

At this point, the coordinator has received PREPARE messages from all three participants and is ready to send
the COMMIT requests to all participants.

33
The coordinator now waits for the COMMIT_ACK responses from all participants.

Participant 1 Terminal:

Participant 2 Terminal:

Participant 3 Terminal:

***

34

You might also like