Merge branch 'master' of github.com:BitShares/fc

This commit is contained in:
Daniel Larimer 2015-02-23 09:06:16 -05:00
commit cf6f81dd5b
5 changed files with 5011 additions and 7 deletions

View file

@ -164,6 +164,7 @@ set( fc_sources
src/network/url.cpp
src/network/gntp.cpp
src/compress/smaz.cpp
src/compress/zlib.cpp
src/compress/lzma.cpp
vendor/cyoencode-1.0.2/src/CyoDecode.c
vendor/cyoencode-1.0.2/src/CyoEncode.c

View file

@ -0,0 +1,10 @@
#pragma once
#include <fc/string.hpp>
namespace fc
{
string zlib_compress(const string& in);
} // namespace fc

4916
src/compress/miniz.c Normal file

File diff suppressed because it is too large Load diff

15
src/compress/zlib.cpp Normal file
View file

@ -0,0 +1,15 @@
#include <fc/compress/zlib.hpp>
#include "miniz.c"
namespace fc
{
string zlib_compress(const string& in)
{
size_t compressed_message_length;
char* compressed_message = (char*)tdefl_compress_mem_to_heap(in.c_str(), in.size(), &compressed_message_length, TDEFL_WRITE_ZLIB_HEADER | TDEFL_DEFAULT_MAX_PROBES);
string result(compressed_message, compressed_message_length);
free(compressed_message);
return result;
}
}

View file

@ -4,12 +4,12 @@
#include <fc/exception/exception.hpp>
#include <fc/log/gelf_appender.hpp>
#include <fc/reflect/variant.hpp>
#include <fc/thread/scoped_lock.hpp>
#include <fc/thread/thread.hpp>
#include <fc/variant.hpp>
#include <fc/io/json.hpp>
#include <fc/crypto/city.hpp>
#include <fc/compress/zlib.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/lexical_cast.hpp>
#include <iomanip>
#include <queue>
@ -24,7 +24,6 @@ namespace fc
config cfg;
optional<ip::endpoint> gelf_endpoint;
udp_socket gelf_socket;
boost::mutex socket_mutex;
impl(const config& c) :
cfg(c)
@ -128,12 +127,75 @@ namespace fc
gelf_message["_task_name"] = context.get_task_name();
string gelf_message_as_string = json::to_string(gelf_message);
std::shared_ptr<char> send_buffer(new char[gelf_message_as_string.size()], [](char* p){ delete[] p; });
memcpy(send_buffer.get(), gelf_message_as_string.c_str(), gelf_message_as_string.size());
unsigned uncompressed_size = gelf_message_as_string.size();
gelf_message_as_string = zlib_compress(gelf_message_as_string);
// graylog2 expects the zlib header to be 0x78 0x9c
// but miniz.c generates 0x78 0x01 (indicating
// low compression instead of default compression)
// so change that here
assert(gelf_message_as_string[0] == (char)0x78);
if (gelf_message_as_string[1] == (char)0x01 ||
gelf_message_as_string[1] == (char)0xda)
gelf_message_as_string[1] = (char)0x9c;
assert(gelf_message_as_string[1] == (char)0x9c);
// packets are sent by UDP, and they tend to disappear if they
// get too large. It's hard to find any solid numbers on how
// large they can be before they get dropped -- datagrams can
// be up to 64k, but anything over 512 is not guaranteed.
// You can play with this number, intermediate values like
// 1400 and 8100 are likely to work on most intranets.
const unsigned max_payload_size = 512;
if (gelf_message_as_string.size() <= max_payload_size)
{
scoped_lock<boost::mutex> lock(my->socket_mutex);
my->gelf_socket.send_to(send_buffer, gelf_message_as_string.size(), *my->gelf_endpoint);
// no need to split
std::shared_ptr<char> send_buffer(new char[gelf_message_as_string.size()],
[](char* p){ delete[] p; });
memcpy(send_buffer.get(), gelf_message_as_string.c_str(),
gelf_message_as_string.size());
my->gelf_socket.send_to(send_buffer, gelf_message_as_string.size(),
*my->gelf_endpoint);
}
else
{
// split the message
// we need to generate an 8-byte ID for this message.
// city hash should do
uint64_t message_id = city_hash64(gelf_message_as_string.c_str(), gelf_message_as_string.size());
const unsigned header_length = 2 /* magic */ + 8 /* msg id */ + 1 /* seq */ + 1 /* count */;
const unsigned body_length = max_payload_size - header_length;
unsigned total_number_of_packets = (gelf_message_as_string.size() + body_length - 1) / body_length;
unsigned bytes_sent = 0;
unsigned number_of_packets_sent = 0;
while (bytes_sent < gelf_message_as_string.size())
{
unsigned bytes_to_send = std::min((unsigned)gelf_message_as_string.size() - bytes_sent,
body_length);
std::shared_ptr<char> send_buffer(new char[max_payload_size],
[](char* p){ delete[] p; });
char* ptr = send_buffer.get();
// magic number for chunked message
*(unsigned char*)ptr++ = 0x1e;
*(unsigned char*)ptr++ = 0x0f;
// message id
memcpy(ptr, (char*)&message_id, sizeof(message_id));
ptr += sizeof(message_id);
*(unsigned char*)(ptr++) = number_of_packets_sent;
*(unsigned char*)(ptr++) = total_number_of_packets;
memcpy(ptr, gelf_message_as_string.c_str() + bytes_sent,
bytes_to_send);
my->gelf_socket.send_to(send_buffer, header_length + bytes_to_send,
*my->gelf_endpoint);
++number_of_packets_sent;
bytes_sent += bytes_to_send;
}
assert(number_of_packets_sent == total_number_of_packets);
}
}
} // fc