From 50638ea3eb61b41d1137a88e0530cace9ebcebfb Mon Sep 17 00:00:00 2001 From: Michael Cooper Date: Sun, 9 Dec 2012 20:52:26 +1100 Subject: [PATCH 1/8] Adding LinkedList2 that uses intrusive nodes --- LinkedList.h | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/LinkedList.h b/LinkedList.h index 5b519a8..3161337 100644 --- a/LinkedList.h +++ b/LinkedList.h @@ -55,7 +55,7 @@ class LinkedList { Node* ret = n->next; delete n; length -= 1; - return n->next; + return ret; } Node* last; @@ -63,4 +63,59 @@ class LinkedList { uint16_t length; }; + +template +class LinkedList2{ +public: + LinkedList2():last(NULL),first(NULL),length(0){} + + void Add(NodeType* item) { + if (last == NULL) { + last = item; + first = item; + item->next = NULL; + item->prev = NULL; + } else { + last->next = item; + item->prev = last; + item->next = NULL; + last = item; + } + length += 1; + } + + template + NodeType* Find(SearchParamType input, bool(*search_func)(const NodeType*, SearchParamType)) + { + NodeType* current = first; + while (current != NULL) { + if (search_func(current, input)) + return current; + current = current->next; + } + return NULL; + } + + NodeType* Remove(NodeType* n) { + if (!n) + return NULL; + if (first == n) + first = n->next; + if (last == n) + last = n->prev; + if (n->prev) + n->prev->next = n->next; + if (n->next) + n->next->prev = n->prev; + NodeType* ret = n->next; + delete n; + length -= 1; + return ret; + } + + NodeType* last; + NodeType* first; + uint16_t length; +}; + #endif From c479b8e02271a4865086fc20afc0e6ac50ea85cf Mon Sep 17 00:00:00 2001 From: Michael Cooper Date: Sun, 9 Dec 2012 20:52:51 +1100 Subject: [PATCH 2/8] Adding packet re-assemlby There appears to still be a bug in either splitting or re-assembly --- MeshBase.cpp | 110 +++++++++++++++++++++++++++++++++++++++++---------- MeshBase.h | 29 ++++++++++---- RF_test.ino | 7 +++- 3 files changed, 116 insertions(+), 30 deletions(-) diff --git a/MeshBase.cpp b/MeshBase.cpp index b14946f..12780ce 100644 --- a/MeshBase.cpp +++ b/MeshBase.cpp @@ -3,7 +3,7 @@ #include "MeshBase.h" #define MAX_PACKET_SIZE 32 -#define MAX_PAYLOAD_SIZE (MAX_PACKET_SIZE - sizeof(Message)) +#define MAX_PAYLOAD_SIZE (MAX_PACKET_SIZE - sizeof(MeshBase::MessageHeader)) // -- Broadcast addresses -- #define PEER_DISCOVERY 1 @@ -28,7 +28,6 @@ void MeshBase::Begin() radio.begin(); radio.enableDynamicPayloads(); radio.setRetries(2,1); - //radio.openReadingPipe(0, TO_ADDRESS(address)); radio.openReadingPipe(1, TO_BROADCAST(PEER_DISCOVERY)); radio.setAutoAck(0, true); radio.setAutoAck(1, false); @@ -46,8 +45,7 @@ void MeshBase::Update() } // Recieve - uint8_t pipe_num; - if (radio.available(&pipe_num)) + if (radio.available()) { bool done = false; do { @@ -78,31 +76,93 @@ void MeshBase::Update() } } +bool FindStream(const MeshBase::Message* current, const MeshBase::MessageHeader* find) +{ + if (current->header.address_from != find->address_from) + return false; + if (current->header.msg_id != find->msg_id) + return false; + return true; +} + +void MeshBase::Message::AddPart(const void* payload, uint8_t len, uint8_t part_num, bool more_parts) +{ + uint8_t start_pos = part_num * MAX_PAYLOAD_SIZE; + uint8_t end_pos = len + (part_num * MAX_PAYLOAD_SIZE); + Serial.print(" R AddPart() : Adding part. start_pos="); + Serial.print(start_pos); + Serial.print(" end_pos="); + Serial.print(end_pos); + Serial.print(" len="); + Serial.print(len); + Serial.print(" part_num="); + Serial.print(part_num); + Serial.print(" more_parts="); + Serial.println(more_parts); + if (data == NULL) + data = malloc(end_pos); + if (end_pos > data_used) + data = realloc(data, end_pos); + memcpy(&static_cast(data)[start_pos], payload, len); + if (end_pos > data_used) + data_used = end_pos; + blocks_recieved += 1; + if (!more_parts) { + header.split_more = false; + header.split_part = part_num; + } +} + +bool MeshBase::Message::IsDone() const +{ + // We set the split_more to false if we recieved the last packet + // in the stream, and split_part to total number of blocks in the stream. + // So if split_more is false, and we have the right number of blocks_recieved + // we are good to go. + Serial.print(" R IsDone() : split_more="); + Serial.print(header.split_more); + Serial.print(" split_part="); + Serial.print(header.split_part); + Serial.print(" blocks_recieved="); + Serial.println(blocks_recieved); + if (!header.split_more && blocks_recieved >= header.split_part) + return true; + Serial.println(" R IsDone() : False"); + return false; +} + +MeshBase::Message::~Message() { + free(data); +} + void MeshBase::HandlePacket(const byte* data, uint8_t len) { - if (len < sizeof(Message)) + if (len < sizeof(MessageHeader)) return; - const MeshBase::Message* msg = (struct MeshBase::Message*)data; - uint8_t payload_length = len - sizeof(Message); - const byte* payload = data + sizeof(Message); - if (msg->split_more || msg->split_part != 0) - { - // Re-assembly needed - // TODO: Re-assemble packets - } else { - switch(msg->type) { + const MeshBase::MessageHeader* header = (struct MeshBase::MessageHeader*)data; + uint8_t payload_length = len - sizeof(MessageHeader); + const byte* payload = data + sizeof(MessageHeader); + Message* s = assembly_list.Find(header, &FindStream); + if (s == NULL) { + s = new Message(*header); + assembly_list.Add(s); + } + s->AddPart(payload, payload_length, header->split_part, header->split_more); + if (s->IsDone()) { + Serial.println(" R IsDone() : true!!"); + switch(header->type) { case type_peer_discovery: - HandlePeerDiscovery(msg, payload, payload_length); + HandlePeerDiscovery(&(s->header), s->data, s->data_used); break; default: - OnMessage(msg, payload, payload_length); + OnMessage(&(s->header), s->data, s->data_used); break; } - delete data; + assembly_list.Remove(s); } } -void MeshBase::HandlePeerDiscovery(const MeshBase::Message* msg, const void* buff, uint8_t length) +void MeshBase::HandlePeerDiscovery(const MeshBase::MessageHeader* msg, const void* buff, uint8_t length) { if (length != sizeof(PeerDiscoveryMessage)) return; @@ -143,12 +203,14 @@ void MeshBase::SendPeerDiscovery() void MeshBase::SendMessage(uint32_t to, uint8_t type, const void* data, uint8_t length, bool is_broadcast) { + static uint8_t current_msg_id = 0; byte buff[MAX_PACKET_SIZE]; - Message* msg = (struct Message*)buff; + MessageHeader* msg = (struct MessageHeader*)buff; msg->protocol_version = 1; msg->ttl = 0; msg->type = type; msg->address_from = address; + msg->msg_id = current_msg_id++; uint8_t num_pkts = (length / MAX_PAYLOAD_SIZE) + 1; for (uint8_t num = 0; num < num_pkts; ++num) @@ -156,15 +218,21 @@ void MeshBase::SendMessage(uint32_t to, uint8_t type, const void* data, uint8_t uint8_t remaining_length = length - (num * MAX_PAYLOAD_SIZE); msg->split_part = num; msg->split_more = remaining_length > MAX_PAYLOAD_SIZE; - memcpy(buff + sizeof(Message), (const byte*)data + (num * MAX_PAYLOAD_SIZE), min(remaining_length, MAX_PAYLOAD_SIZE)); + memcpy(buff + sizeof(MessageHeader), (const byte*)data + (num * MAX_PAYLOAD_SIZE), min(remaining_length, MAX_PAYLOAD_SIZE)); radio.stopListening(); if (is_broadcast) radio.openWritingPipe(TO_BROADCAST(to)); else radio.openWritingPipe(TO_ADDRESS(to)); - radio.write(buff, min(remaining_length, MAX_PAYLOAD_SIZE)); + radio.write(buff, min(remaining_length + sizeof(MessageHeader), MAX_PAYLOAD_SIZE)); radio.startListening(); + Serial.print(" T Sending pkt split_part="); + Serial.print(msg->split_part); + Serial.print(" split_more="); + Serial.print(msg->split_more); + Serial.print(" length="); + Serial.println(min(remaining_length, MAX_PAYLOAD_SIZE)); } } diff --git a/MeshBase.h b/MeshBase.h index 76a2f7e..63cd4e8 100644 --- a/MeshBase.h +++ b/MeshBase.h @@ -17,8 +17,8 @@ class MeshBase uint16_t time; Peer(uint32_t address) : address(address), time(0) {} }; - - struct Message + + struct MessageHeader { uint8_t protocol_version : 4; uint8_t ttl : 4; @@ -29,6 +29,20 @@ class MeshBase uint32_t address_from; } PACKED; + struct Message { + Message(const MessageHeader& a) : header(a), data(NULL), data_used(0), blocks_recieved(0), next(0), prev(0) {} + ~Message(); + MessageHeader header; + void* data; + uint8_t data_used; + uint8_t blocks_recieved; + Message* next; + Message* prev; + + void AddPart(const void* data, uint8_t len, uint8_t part_num, bool more_parts); + bool IsDone() const; + }; + // -- Message types -- enum message_type { type_peer_discovery, @@ -42,7 +56,7 @@ class MeshBase uint32_t GetAddress() const { return address; } bool IsReady() const { return address != 0; } protected: - virtual void OnMessage(const MeshBase::Message* meta, const void* data, uint8_t length) = 0; + virtual void OnMessage(const MessageHeader* meta, const void* data, uint8_t length) = 0; virtual void OnNewPeer(Peer*) {} virtual void OnLostPeer(Peer*) {} private: @@ -53,14 +67,15 @@ class MeshBase void SendPeerDiscovery(); void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast); - void HandlePeerDiscovery(const Message* msg, const void* buff, uint8_t length); + void HandlePeerDiscovery(const MessageHeader* msg, const void* buff, uint8_t length); void HandlePacket(const byte* data, uint8_t length); void ChooseAddress(); - + LinkedList peers; - + LinkedList2 assembly_list; + Peer* GetPeer(uint32_t address); - + struct PeerDiscoveryMessage { uint8_t protocol_version; diff --git a/RF_test.ino b/RF_test.ino index 6c94368..f3f7926 100644 --- a/RF_test.ino +++ b/RF_test.ino @@ -7,17 +7,19 @@ class App : public MeshBase public: App() : MeshBase(9, 10) {} protected: - virtual void OnMessage(const MeshBase::Message* meta, const void* data, uint8_t length) + virtual void OnMessage(const MeshBase::MessageHeader* meta, const void* data, uint8_t length) { Serial.print(meta->address_from, DEC); Serial.print(" : "); Serial.println((const char*)data); + Serial.print("split_part = "); + Serial.println(meta->split_part, DEC); } virtual void OnNewPeer(Peer* p) { if (!IsReady()) return; char buff[255]; - int len = snprintf(buff, 255, "Hello %u", p->address); + int len = snprintf(buff, 255, "Hello %u. This is a super long message for some reason. Please keep adding to this message for great good and other things. I would very much like it if this could be put together.", p->address); Serial.print("Me : "); Serial.println(buff); SendMessage(p->address, type_user, buff, len + 1); @@ -39,3 +41,4 @@ void loop() app.Update(); delay(100); } + From fb08ebfe457a5d8f1403801d61089b4e16334452 Mon Sep 17 00:00:00 2001 From: Michael Cooper Date: Sun, 9 Dec 2012 22:07:15 +1100 Subject: [PATCH 3/8] Bugfix: was capping at PAYLOAD instead of PACKET. oops. --- MeshBase.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MeshBase.cpp b/MeshBase.cpp index 12780ce..d107118 100644 --- a/MeshBase.cpp +++ b/MeshBase.cpp @@ -225,7 +225,7 @@ void MeshBase::SendMessage(uint32_t to, uint8_t type, const void* data, uint8_t radio.openWritingPipe(TO_BROADCAST(to)); else radio.openWritingPipe(TO_ADDRESS(to)); - radio.write(buff, min(remaining_length + sizeof(MessageHeader), MAX_PAYLOAD_SIZE)); + radio.write(buff, min(remaining_length + sizeof(MessageHeader), MAX_PACKET_SIZE)); radio.startListening(); Serial.print(" T Sending pkt split_part="); Serial.print(msg->split_part); From 1465f447f900499445ee377311c3e9357b2ec7be Mon Sep 17 00:00:00 2001 From: Michael Cooper Date: Mon, 31 Dec 2012 14:49:47 +1100 Subject: [PATCH 4/8] Fixing isDone() calculation --- MeshBase.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/MeshBase.cpp b/MeshBase.cpp index d107118..bbf24dd 100644 --- a/MeshBase.cpp +++ b/MeshBase.cpp @@ -124,10 +124,13 @@ bool MeshBase::Message::IsDone() const Serial.print(" split_part="); Serial.print(header.split_part); Serial.print(" blocks_recieved="); - Serial.println(blocks_recieved); - if (!header.split_more && blocks_recieved >= header.split_part) + Serial.print(blocks_recieved); + if (!header.split_more && blocks_recieved > header.split_part) + { + Serial.println(" - True **"); return true; - Serial.println(" R IsDone() : False"); + } + Serial.println(" - False"); return false; } @@ -149,7 +152,6 @@ void MeshBase::HandlePacket(const byte* data, uint8_t len) } s->AddPart(payload, payload_length, header->split_part, header->split_more); if (s->IsDone()) { - Serial.println(" R IsDone() : true!!"); switch(header->type) { case type_peer_discovery: HandlePeerDiscovery(&(s->header), s->data, s->data_used); From 28eaec7d93a1cfac9d3f87c86dd042e665d032e6 Mon Sep 17 00:00:00 2001 From: Michael Cooper Date: Mon, 31 Dec 2012 15:09:28 +1100 Subject: [PATCH 5/8] Tweaking for reliability. --- MeshBase.cpp | 28 ++++++++++++++++++++-------- MeshBase.h | 5 ++--- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/MeshBase.cpp b/MeshBase.cpp index bbf24dd..496724c 100644 --- a/MeshBase.cpp +++ b/MeshBase.cpp @@ -27,7 +27,7 @@ void MeshBase::Begin() { radio.begin(); radio.enableDynamicPayloads(); - radio.setRetries(2,1); + radio.setRetries(4,2); radio.openReadingPipe(1, TO_BROADCAST(PEER_DISCOVERY)); radio.setAutoAck(0, true); radio.setAutoAck(1, false); @@ -145,6 +145,9 @@ void MeshBase::HandlePacket(const byte* data, uint8_t len) const MeshBase::MessageHeader* header = (struct MeshBase::MessageHeader*)data; uint8_t payload_length = len - sizeof(MessageHeader); const byte* payload = data + sizeof(MessageHeader); + if (header->protocol_version != 1) + return; + Message* s = assembly_list.Find(header, &FindStream); if (s == NULL) { s = new Message(*header); @@ -221,20 +224,29 @@ void MeshBase::SendMessage(uint32_t to, uint8_t type, const void* data, uint8_t msg->split_part = num; msg->split_more = remaining_length > MAX_PAYLOAD_SIZE; memcpy(buff + sizeof(MessageHeader), (const byte*)data + (num * MAX_PAYLOAD_SIZE), min(remaining_length, MAX_PAYLOAD_SIZE)); + uint8_t wire_size = min(remaining_length + sizeof(MessageHeader), MAX_PACKET_SIZE); radio.stopListening(); + bool result = true; if (is_broadcast) radio.openWritingPipe(TO_BROADCAST(to)); else radio.openWritingPipe(TO_ADDRESS(to)); - radio.write(buff, min(remaining_length + sizeof(MessageHeader), MAX_PACKET_SIZE)); + if (is_broadcast) { + //radio.startWrite(buff, wire_size); + result = radio.write(buff, wire_size); + } else { + result = radio.write(buff, wire_size); + } radio.startListening(); - Serial.print(" T Sending pkt split_part="); - Serial.print(msg->split_part); - Serial.print(" split_more="); - Serial.print(msg->split_more); - Serial.print(" length="); - Serial.println(min(remaining_length, MAX_PAYLOAD_SIZE)); + if (!is_broadcast) + { + Serial.print(" T Sending pkt split_part="); + Serial.print(msg->split_part); + Serial.print(" result="); + Serial.println(result); + } + delay(100); } } diff --git a/MeshBase.h b/MeshBase.h index 63cd4e8..e3c8be0 100644 --- a/MeshBase.h +++ b/MeshBase.h @@ -18,8 +18,7 @@ class MeshBase Peer(uint32_t address) : address(address), time(0) {} }; - struct MessageHeader - { + struct MessageHeader { uint8_t protocol_version : 4; uint8_t ttl : 4; uint8_t msg_id; @@ -53,6 +52,7 @@ class MeshBase void Begin(); void Update(); void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length); + void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast); uint32_t GetAddress() const { return address; } bool IsReady() const { return address != 0; } protected: @@ -66,7 +66,6 @@ class MeshBase unsigned long last_peer_check_time; void SendPeerDiscovery(); - void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast); void HandlePeerDiscovery(const MessageHeader* msg, const void* buff, uint8_t length); void HandlePacket(const byte* data, uint8_t length); void ChooseAddress(); From d9013270cc7c974d8f2b66ddf0792caceceaa870 Mon Sep 17 00:00:00 2001 From: Michael Cooper Date: Mon, 31 Dec 2012 19:16:46 +1100 Subject: [PATCH 6/8] Tweaking --- MeshBase.cpp | 46 ++++++++++++++++++++-------------------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/MeshBase.cpp b/MeshBase.cpp index 496724c..5b10c97 100644 --- a/MeshBase.cpp +++ b/MeshBase.cpp @@ -12,9 +12,9 @@ #define TO_BROADCAST(x) (0xBB00000000LL + x) #define TO_ADDRESS(x) (0xAA00000000LL + x) -#define PEER_DISCOVERY_TIME 3000 +#define PEER_DISCOVERY_TIME 4000 #define PEER_CHECK_TIME 4000 -#define PEER_TIMEOUT 2 +#define PEER_TIMEOUT 3 MeshBase::MeshBase(uint8_t ce, uint8_t cs) : radio(ce, cs) @@ -89,16 +89,6 @@ void MeshBase::Message::AddPart(const void* payload, uint8_t len, uint8_t part_n { uint8_t start_pos = part_num * MAX_PAYLOAD_SIZE; uint8_t end_pos = len + (part_num * MAX_PAYLOAD_SIZE); - Serial.print(" R AddPart() : Adding part. start_pos="); - Serial.print(start_pos); - Serial.print(" end_pos="); - Serial.print(end_pos); - Serial.print(" len="); - Serial.print(len); - Serial.print(" part_num="); - Serial.print(part_num); - Serial.print(" more_parts="); - Serial.println(more_parts); if (data == NULL) data = malloc(end_pos); if (end_pos > data_used) @@ -119,17 +109,16 @@ bool MeshBase::Message::IsDone() const // in the stream, and split_part to total number of blocks in the stream. // So if split_more is false, and we have the right number of blocks_recieved // we are good to go. - Serial.print(" R IsDone() : split_more="); - Serial.print(header.split_more); - Serial.print(" split_part="); - Serial.print(header.split_part); - Serial.print(" blocks_recieved="); - Serial.print(blocks_recieved); - if (!header.split_more && blocks_recieved > header.split_part) - { - Serial.println(" - True **"); + if (!header.split_more && blocks_recieved > header.split_part) { + if (blocks_recieved > 1) { + Serial.print(" R IsDone() : id="); + Serial.print(header.msg_id); + Serial.println(" - True **"); + } return true; } + Serial.print(" R IsDone() : id="); + Serial.print(header.msg_id); Serial.println(" - False"); return false; } @@ -237,16 +226,21 @@ void MeshBase::SendMessage(uint32_t to, uint8_t type, const void* data, uint8_t result = radio.write(buff, wire_size); } else { result = radio.write(buff, wire_size); - } - radio.startListening(); - if (!is_broadcast) - { + if (result == false) { + // Issue transmitting packet, retry? + radio.startListening(); + delay(100); + radio.stopListening(); + result = radio.write(buff, wire_size); + } Serial.print(" T Sending pkt split_part="); Serial.print(msg->split_part); + Serial.print(" id="); + Serial.print(msg->msg_id); Serial.print(" result="); Serial.println(result); } - delay(100); + radio.startListening(); } } From dd63ba77b6195bcb1b99c15bec1d274273108bc9 Mon Sep 17 00:00:00 2001 From: Michael Cooper Date: Fri, 11 Jan 2013 00:47:57 +1100 Subject: [PATCH 7/8] Begin publish/subscribe framework Adding support for application capabilities Discard incomplete messages after a while --- MeshBase.cpp | 74 ++++++++++++++++++++---------- MeshBase.h | 54 +++++++++++++--------- Publisher.h | 48 +++++++++++++++++++ RF_test.ino | 44 ------------------ examples/Publisher/Publisher.ino | 33 +++++++++++++ examples/Subscriber/Subscriber.ino | 64 ++++++++++++++++++++++++++ 6 files changed, 227 insertions(+), 90 deletions(-) create mode 100644 Publisher.h delete mode 100644 RF_test.ino create mode 100644 examples/Publisher/Publisher.ino create mode 100644 examples/Subscriber/Subscriber.ino diff --git a/MeshBase.cpp b/MeshBase.cpp index 5b10c97..90157e5 100644 --- a/MeshBase.cpp +++ b/MeshBase.cpp @@ -13,14 +13,16 @@ #define TO_ADDRESS(x) (0xAA00000000LL + x) #define PEER_DISCOVERY_TIME 4000 -#define PEER_CHECK_TIME 4000 +#define CHECK_TIME 4000 #define PEER_TIMEOUT 3 +#define ASSEMBLY_TIMEOUT 2 MeshBase::MeshBase(uint8_t ce, uint8_t cs) : radio(ce, cs) , address(0) , last_broadcast_time(0) -, last_peer_check_time(0) +, last_check_time(0) +, application_capabilities(0) {} void MeshBase::Begin() @@ -56,23 +58,47 @@ void MeshBase::Update() } while (!done); } - // Update peers - if (millis() - last_peer_check_time > PEER_CHECK_TIME) + // Do periodic checks + if (millis() - last_check_time > CHECK_TIME) { - LinkedList::Node* current = peers.first; - while(current != NULL) + // Check for expired peers { - current->item->time += 1; - if (current->item->time >= PEER_TIMEOUT) + LinkedList::Node* current = peers.first; + while(current != NULL) { - Serial.print("Lost Peer: "); - Serial.println(current->item->address, DEC); - current = peers.Remove(current); - } else { - current = current->next; + current->item->time += 1; + if (current->item->time >= PEER_TIMEOUT) + { + Serial.print("Lost Peer: "); + Serial.println(current->item->address, DEC); + current = peers.Remove(current); + } else { + current = current->next; + } } } - last_peer_check_time = millis(); + + // Check for expired packets + { + Message* current = assembly_list.first; + while(current != NULL) + { + current->age += 1; + if (current->age >= ASSEMBLY_TIMEOUT) + { + Serial.print("Dropped partial message. address="); + Serial.print(current->header.address_from, DEC); + Serial.print(" msg_id="); + Serial.print(current->header.msg_id); + Serial.print(" blocks_recieved="); + Serial.println(current->blocks_recieved); + current = assembly_list.Remove(current); + } else { + current = current->next; + } + } + } + last_check_time = millis(); } } @@ -101,6 +127,7 @@ void MeshBase::Message::AddPart(const void* payload, uint8_t len, uint8_t part_n header.split_more = false; header.split_part = part_num; } + age = 0; } bool MeshBase::Message::IsDone() const @@ -110,16 +137,8 @@ bool MeshBase::Message::IsDone() const // So if split_more is false, and we have the right number of blocks_recieved // we are good to go. if (!header.split_more && blocks_recieved > header.split_part) { - if (blocks_recieved > 1) { - Serial.print(" R IsDone() : id="); - Serial.print(header.msg_id); - Serial.println(" - True **"); - } return true; } - Serial.print(" R IsDone() : id="); - Serial.print(header.msg_id); - Serial.println(" - False"); return false; } @@ -175,11 +194,12 @@ void MeshBase::HandlePeerDiscovery(const MeshBase::MessageHeader* msg, const voi Serial.print(" num_peers="); Serial.println(pd->num_peers, DEC); Peer* p = new Peer(msg->address_from); + p->Update(pd); peers.Add(p); OnNewPeer(p); } else { // Existing peer, reset timer - peer->time = 0; + peer->Update(pd); } } @@ -189,7 +209,7 @@ void MeshBase::SendPeerDiscovery() MeshBase::PeerDiscoveryMessage payload; payload.protocol_version = 1; payload.network_capabilities = 0; - payload.application_capabilities = 0; + payload.application_capabilities = application_capabilities; payload.num_peers = peers.length; payload.uptime = millis() / 1000; SendMessage(PEER_DISCOVERY, type_peer_discovery, &payload, sizeof(payload), true); @@ -273,3 +293,9 @@ MeshBase::Peer* MeshBase::GetPeer(uint32_t a) return NULL; } +void MeshBase::Peer::Update(const PeerDiscoveryMessage* msg) +{ + application_capabilities = msg->application_capabilities; + time = 0; +} + diff --git a/MeshBase.h b/MeshBase.h index e3c8be0..2d59b91 100644 --- a/MeshBase.h +++ b/MeshBase.h @@ -7,17 +7,13 @@ #define PACKED __attribute__ ((packed)) +typedef uint32_t address_t; + class MeshBase { public: MeshBase(uint8_t ce, uint8_t cs); - struct Peer { - uint32_t address; - uint16_t time; - Peer(uint32_t address) : address(address), time(0) {} - }; - struct MessageHeader { uint8_t protocol_version : 4; uint8_t ttl : 4; @@ -25,11 +21,11 @@ class MeshBase bool split_more : 1; uint8_t split_part : 7; uint8_t type; - uint32_t address_from; + address_t address_from; } PACKED; struct Message { - Message(const MessageHeader& a) : header(a), data(NULL), data_used(0), blocks_recieved(0), next(0), prev(0) {} + Message(const MessageHeader& a) : header(a), data(NULL), data_used(0), blocks_recieved(0), next(NULL), prev(NULL), age(0) {} ~Message(); MessageHeader header; void* data; @@ -37,33 +33,56 @@ class MeshBase uint8_t blocks_recieved; Message* next; Message* prev; + uint8_t age; void AddPart(const void* data, uint8_t len, uint8_t part_num, bool more_parts); bool IsDone() const; }; // -- Message types -- - enum message_type { + enum MessageType { type_peer_discovery, type_peer_list, type_user, }; + enum ApplicationCapabilities { + capability_publish_events = 1 >> 0, + }; + void Begin(); void Update(); - void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length); - void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast); - uint32_t GetAddress() const { return address; } + void SendMessage(address_t address, uint8_t type, const void* data, uint8_t length); + void SendMessage(address_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast); + address_t GetAddress() const { return address; } bool IsReady() const { return address != 0; } protected: + struct PeerDiscoveryMessage + { + uint8_t protocol_version; + uint8_t network_capabilities; // What routing/networking can I do for the network + uint8_t application_capabilities; // What type of data do I expose + uint16_t num_peers; // Number of direct peers + uint32_t uptime; // Seconds since boot + } PACKED; + + struct Peer { + Peer(uint32_t address) : address(address), time(0), application_capabilities(0) {} + uint32_t address; + uint8_t time; + uint8_t application_capabilities; + void Update(const PeerDiscoveryMessage* msg); + }; + virtual void OnMessage(const MessageHeader* meta, const void* data, uint8_t length) = 0; virtual void OnNewPeer(Peer*) {} virtual void OnLostPeer(Peer*) {} + uint8_t application_capabilities; private: uint32_t address; RF24 radio; unsigned long last_broadcast_time; - unsigned long last_peer_check_time; + unsigned long last_check_time; void SendPeerDiscovery(); void HandlePeerDiscovery(const MessageHeader* msg, const void* buff, uint8_t length); @@ -75,15 +94,6 @@ class MeshBase Peer* GetPeer(uint32_t address); - struct PeerDiscoveryMessage - { - uint8_t protocol_version; - uint8_t network_capabilities; // What routing/networking can I do for the network - uint8_t application_capabilities; // What type of data do I expose - uint16_t num_peers; // Number of direct peers - uint32_t uptime; // Seconds since boot - } PACKED; - }; #endif diff --git a/Publisher.h b/Publisher.h new file mode 100644 index 0000000..bbbd91f --- /dev/null +++ b/Publisher.h @@ -0,0 +1,48 @@ +#ifndef PUBLISHER_H +#define PUBLISHER_H + +#include +#include "MeshBase.h" + +class PublishApp : public MeshBase +{ +public: + PublishApp() : MeshBase(9, 10) + { + application_capabilities |= MeshBase::capability_publish_events; + } + + void OnEvent(uint8_t event_data) + { + const Target* current = targets.first; + while (current != NULL) + { + SendMessage(current->address, type_on_event, &event_data, sizeof(event_data)); + current = current->next; + } + } + + enum PublishMessageType { + type_on_event = MeshBase::type_user, + type_subscribe, + }; +protected: + virtual void OnMessage(const MeshBase::MessageHeader* meta, const void* data, uint8_t length) + { + if (meta->type == type_subscribe) + { + targets.Add(new Target(meta->address_from)); + } + } +private: + struct Target + { + Target(address_t target) : address(target), prev(NULL), next(NULL) {} + address_t address; + Target* prev; + Target* next; + }; + LinkedList2 targets; +}; + +#endif // PUBLISHER_H diff --git a/RF_test.ino b/RF_test.ino deleted file mode 100644 index f3f7926..0000000 --- a/RF_test.ino +++ /dev/null @@ -1,44 +0,0 @@ -#include -#include "RF24.h" -#include "MeshBase.h" - -class App : public MeshBase -{ -public: - App() : MeshBase(9, 10) {} -protected: - virtual void OnMessage(const MeshBase::MessageHeader* meta, const void* data, uint8_t length) - { - Serial.print(meta->address_from, DEC); - Serial.print(" : "); - Serial.println((const char*)data); - Serial.print("split_part = "); - Serial.println(meta->split_part, DEC); - } - virtual void OnNewPeer(Peer* p) - { - if (!IsReady()) return; - char buff[255]; - int len = snprintf(buff, 255, "Hello %u. This is a super long message for some reason. Please keep adding to this message for great good and other things. I would very much like it if this could be put together.", p->address); - Serial.print("Me : "); - Serial.println(buff); - SendMessage(p->address, type_user, buff, len + 1); - } -}; - -App app; - -void setup() -{ - Serial.begin(9600); - Serial.println("Starting RF_TEST"); - randomSeed(analogRead(0)); - app.Begin(); -} - -void loop() -{ - app.Update(); - delay(100); -} - diff --git a/examples/Publisher/Publisher.ino b/examples/Publisher/Publisher.ino new file mode 100644 index 0000000..a022f4c --- /dev/null +++ b/examples/Publisher/Publisher.ino @@ -0,0 +1,33 @@ +#include +#include "RF24.h" +#include "MeshBase.h" +#include "Publisher.h" +#include "LinkedList.h" + +PublishApp app; + +unsigned long last_time; +uint8_t sequence; + +void setup() +{ + Serial.begin(19200); + Serial.println("Starting..."); + randomSeed(analogRead(0)); + app.Begin(); + last_time = millis(); + sequence = 0; +} + +void loop() +{ + app.Update(); + delay(100); + if (millis() - last_time > 10000) + { + app.OnEvent(sequence); + ++sequence; + last_time = millis(); + } +} + diff --git a/examples/Subscriber/Subscriber.ino b/examples/Subscriber/Subscriber.ino new file mode 100644 index 0000000..ec213ae --- /dev/null +++ b/examples/Subscriber/Subscriber.ino @@ -0,0 +1,64 @@ +#include +#include "RF24.h" +#include "MeshBase.h" +#include "Publisher.h" +#include "LinkedList.h" + +class SubscribeApp : public MeshBase +{ +public: + SubscribeApp() : MeshBase(9, 10) {} + + enum PublishMessageType { + type_on_event = MeshBase::type_user, + type_subscribe, + }; +protected: + virtual void OnMessage(const MeshBase::MessageHeader* meta, const void* data, uint8_t length) + { + if (meta->type == type_on_event) + { + if (length != sizeof(uint8_t)) + { + Serial.println("LENGTH WRONG"); + return; + } + OnEvent(meta->address_from, *(uint8_t*)data); + } + } + + virtual void OnNewPeer(Peer* p) + { + if (p->application_capabilities & MeshBase::capability_publish_events) + { + Serial.print("Found peer that has events! address="); + Serial.println(p->address); + SendMessage(p->address, type_subscribe, "", 1); + } + } + + virtual void OnEvent(address_t address, uint8_t event_data) + { + Serial.print("Event from "); + Serial.print(address); + Serial.print(" data: "); + Serial.println(event_data); + } +}; + +SubscribeApp app; + +void setup() +{ + Serial.begin(19200); + Serial.println("Starting..."); + randomSeed(analogRead(0)); + app.Begin(); +} + +void loop() +{ + app.Update(); + delay(100); +} + From 4f45079aa7005c68a28e08ac623fba12aa1510bf Mon Sep 17 00:00:00 2001 From: Michael Cooper Date: Fri, 11 Jan 2013 00:49:20 +1100 Subject: [PATCH 8/8] Adding gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b25c15b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*~