Fix GELF logging to split long messages, use compression
This commit is contained in:
parent
ec66863902
commit
30e52b6b01
5 changed files with 5017 additions and 4 deletions
|
|
@ -164,6 +164,7 @@ set( fc_sources
|
|||
src/network/url.cpp
|
||||
src/network/gntp.cpp
|
||||
src/compress/smaz.cpp
|
||||
src/compress/zlib.cpp
|
||||
src/compress/lzma.cpp
|
||||
vendor/cyoencode-1.0.2/src/CyoDecode.c
|
||||
vendor/cyoencode-1.0.2/src/CyoEncode.c
|
||||
|
|
|
|||
10
include/fc/compress/zlib.hpp
Normal file
10
include/fc/compress/zlib.hpp
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
#pragma once
|
||||
|
||||
#include <fc/string.hpp>
|
||||
|
||||
namespace fc
|
||||
{
|
||||
|
||||
string zlib_compress(const string& in);
|
||||
|
||||
} // namespace fc
|
||||
4916
src/compress/miniz.c
Normal file
4916
src/compress/miniz.c
Normal file
File diff suppressed because it is too large
Load diff
15
src/compress/zlib.cpp
Normal file
15
src/compress/zlib.cpp
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
#include <fc/compress/zlib.hpp>
|
||||
|
||||
#include "miniz.c"
|
||||
|
||||
namespace fc
|
||||
{
|
||||
string zlib_compress(const string& in)
|
||||
{
|
||||
size_t compressed_message_length;
|
||||
char* compressed_message = (char*)tdefl_compress_mem_to_heap(in.c_str(), in.size(), &compressed_message_length, TDEFL_WRITE_ZLIB_HEADER | TDEFL_DEFAULT_MAX_PROBES);
|
||||
string result(compressed_message, compressed_message_length);
|
||||
free(compressed_message);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
@ -8,6 +8,8 @@
|
|||
#include <fc/thread/thread.hpp>
|
||||
#include <fc/variant.hpp>
|
||||
#include <fc/io/json.hpp>
|
||||
#include <fc/crypto/city.hpp>
|
||||
#include <fc/compress/zlib.hpp>
|
||||
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
|
@ -128,12 +130,81 @@ namespace fc
|
|||
gelf_message["_task_name"] = context.get_task_name();
|
||||
|
||||
string gelf_message_as_string = json::to_string(gelf_message);
|
||||
std::shared_ptr<char> send_buffer(new char[gelf_message_as_string.size()], [](char* p){ delete[] p; });
|
||||
memcpy(send_buffer.get(), gelf_message_as_string.c_str(), gelf_message_as_string.size());
|
||||
unsigned uncompressed_size = gelf_message_as_string.size();
|
||||
gelf_message_as_string = zlib_compress(gelf_message_as_string);
|
||||
|
||||
// graylog2 expects the zlib header to be 0x78 0x9c
|
||||
// but miniz.c generates 0x78 0x01 (indicating
|
||||
// low compression instead of default compression)
|
||||
// so change that here
|
||||
assert(gelf_message_as_string[0] == (char)0x78);
|
||||
if (gelf_message_as_string[1] == (char)0x01 ||
|
||||
gelf_message_as_string[1] == (char)0xda)
|
||||
gelf_message_as_string[1] = (char)0x9c;
|
||||
assert(gelf_message_as_string[1] == (char)0x9c);
|
||||
|
||||
// packets are sent by UDP, and they tend to disappear if they
|
||||
// get too large. It's hard to find any solid numbers on how
|
||||
// large they can be before they get dropped -- datagrams can
|
||||
// be up to 64k, but anything over 512 is not guaranteed.
|
||||
// You can play with this number, intermediate values like
|
||||
// 1400 and 8100 are likely to work on most intranets.
|
||||
const unsigned max_payload_size = 512;
|
||||
|
||||
if (gelf_message_as_string.size() <= max_payload_size)
|
||||
{
|
||||
// no need to split
|
||||
std::shared_ptr<char> send_buffer(new char[gelf_message_as_string.size()],
|
||||
[](char* p){ delete[] p; });
|
||||
memcpy(send_buffer.get(), gelf_message_as_string.c_str(),
|
||||
gelf_message_as_string.size());
|
||||
|
||||
{
|
||||
scoped_lock<boost::mutex> lock(my->socket_mutex);
|
||||
my->gelf_socket.send_to(send_buffer, gelf_message_as_string.size(), *my->gelf_endpoint);
|
||||
my->gelf_socket.send_to(send_buffer, gelf_message_as_string.size(),
|
||||
*my->gelf_endpoint);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// split the message
|
||||
// we need to generate an 8-byte ID for this message.
|
||||
// city hash should do
|
||||
uint64_t message_id = city_hash64(gelf_message_as_string.c_str(), gelf_message_as_string.size());
|
||||
const unsigned header_length = 2 /* magic */ + 8 /* msg id */ + 1 /* seq */ + 1 /* count */;
|
||||
const unsigned body_length = max_payload_size - header_length;
|
||||
unsigned total_number_of_packets = (gelf_message_as_string.size() + body_length - 1) / body_length;
|
||||
unsigned bytes_sent = 0;
|
||||
unsigned number_of_packets_sent = 0;
|
||||
while (bytes_sent < gelf_message_as_string.size())
|
||||
{
|
||||
unsigned bytes_to_send = std::min((unsigned)gelf_message_as_string.size() - bytes_sent,
|
||||
body_length);
|
||||
|
||||
std::shared_ptr<char> send_buffer(new char[max_payload_size],
|
||||
[](char* p){ delete[] p; });
|
||||
char* ptr = send_buffer.get();
|
||||
// magic number for chunked message
|
||||
*(unsigned char*)ptr++ = 0x1e;
|
||||
*(unsigned char*)ptr++ = 0x0f;
|
||||
|
||||
// message id
|
||||
memcpy(ptr, (char*)&message_id, sizeof(message_id));
|
||||
ptr += sizeof(message_id);
|
||||
|
||||
*(unsigned char*)(ptr++) = number_of_packets_sent;
|
||||
*(unsigned char*)(ptr++) = total_number_of_packets;
|
||||
memcpy(ptr, gelf_message_as_string.c_str() + bytes_sent,
|
||||
bytes_to_send);
|
||||
{
|
||||
scoped_lock<boost::mutex> lock(my->socket_mutex);
|
||||
my->gelf_socket.send_to(send_buffer, header_length + bytes_to_send,
|
||||
*my->gelf_endpoint);
|
||||
}
|
||||
++number_of_packets_sent;
|
||||
bytes_sent += bytes_to_send;
|
||||
}
|
||||
assert(number_of_packets_sent == total_number_of_packets);
|
||||
}
|
||||
}
|
||||
} // fc
|
||||
|
|
|
|||
Loading…
Reference in a new issue