/* * Copyright 2014 Splunk, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"): you may * not use this file except in compliance with the License. You may obtain * a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ using Serilog; using System; using System.Collections.Concurrent; using System.IO; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Security.Authentication; using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; namespace Admin.Core.Serilog.Es { /// /// TcpSocketWriter encapsulates queueing strings to be written to a TCP _socket /// and handling reconnections (according to a TcpConnectionPolicy object passed /// to it) when a TCP session drops. /// /// /// TcpSocketWriter maintains a fixed sized queue of strings to be sent via /// the TCP _port and, while the _socket is open, sends them as quickly as possible. /// /// If the TCP session drops, TcpSocketWriter will stop pulling strings off the /// queue until it can reestablish a connection. Any SocketErrors emitted during this /// process will be passed as arguments to invocations of LoggingFailureHandler. /// If the TcpConnectionPolicy.Connect method throws an exception (in particular, /// TcpReconnectFailure to indicate that the policy has reached a point where it /// will no longer try to establish a connection) then the LoggingFailureHandler /// event is invoked, and no further attempt to log anything will be made. /// public class TcpSocketWriter : IDisposable { private readonly FixedSizeQueue _eventQueue; private readonly ExponentialBackoffTcpReconnectionPolicy _reconnectPolicy = new ExponentialBackoffTcpReconnectionPolicy(); private readonly CancellationTokenSource _tokenSource; // Must be private or Dispose will not function properly. private readonly TaskCompletionSource _disposed = new TaskCompletionSource(); private Stream _stream; /// /// Event that is invoked when reconnecting after a TCP session is dropped fails. /// public event Action LoggingFailureHandler = ex => { UnexpectedErrorLogger( ex, (x, socketError) => { if (socketError == null) { //Log.Error(x, "failure inside TCP socket: {message}", x.Message); } else { //Log.Error( // x, // "failure inside TCP socket: {message} - socket error found {socketErrorCode}", // x.Message, // socketError); } }); }; public static void UnexpectedErrorLogger(Exception ex, Action log) { SocketError? socketErrorCode = null; var current = ex; do { if (current is SocketException) { socketErrorCode = ((SocketException) current).SocketErrorCode; } current = current.InnerException; } while (socketErrorCode == null && current != null); log(ex, socketErrorCode); } /// /// Construct a TCP _socket writer that writes to the given endPoint and _port. /// /// Uri to open a TCP socket to. /// The maximum number of log entries to queue before starting to drop entries. public TcpSocketWriter(Uri uri, int maxQueueSize = 5000) { _eventQueue = new FixedSizeQueue(maxQueueSize); _tokenSource = new CancellationTokenSource(); Func> tryOpenSocket = async h => { try { TcpClient client = new TcpClient(); await client.ConnectAsync(uri.Host, uri.Port); Stream stream = client.GetStream(); if (uri.Scheme.ToLower() != "tls") return stream; var sslStream = new SslStream(client.GetStream(), false, null, null); await sslStream.AuthenticateAsClientAsync(uri.Host); return sslStream; } catch (Exception e) { LoggingFailureHandler(e); throw; } }; var threadReady = new TaskCompletionSource(); Task queueListener = Task.Factory.StartNew(async () => { try { bool sslEnabled = uri.Scheme.ToLower() == "tls"; _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); threadReady.SetResult(true); // Signal the calling thread that we are ready. string entry = null; while (_stream != null) // null indicates that the thread has been cancelled and cleaned up. { if (_tokenSource.Token.IsCancellationRequested) { _eventQueue.CompleteAdding(); // Post-condition: no further items will be added to the queue, so there will be a finite number of items to handle. while (_eventQueue.Count > 0) { entry = _eventQueue.Dequeue(); try { byte[] messsage = Encoding.UTF8.GetBytes(entry); await _stream.WriteAsync(messsage, 0, messsage.Length); await _stream.FlushAsync(); } catch (SocketException ex) { LoggingFailureHandler(ex); } } break; } if (entry == null) { entry = _eventQueue.Dequeue(_tokenSource.Token); } else { try { byte[] messsage = Encoding.UTF8.GetBytes(entry); await _stream.WriteAsync(messsage, 0, messsage.Length); await _stream.FlushAsync(); // No exception, it was sent entry = null; } catch (IOException ex) { LoggingFailureHandler(ex); _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); } catch (SocketException ex) { LoggingFailureHandler(ex); _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); } } } } catch (Exception e) { LoggingFailureHandler(e); } finally { if (_stream != null) { _stream.Dispose(); } _disposed.SetResult(true); } }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); threadReady.Task.Wait(TimeSpan.FromSeconds(5)); } public void Dispose() { // The following operations are idempotent. Issue a cancellation to tell the // writer thread to stop the queue from accepting entries and write what it has // before cleaning up, then wait until that cleanup is finished. _tokenSource.Cancel(); Task.Run(async () => await _disposed.Task).Wait(); } /// /// Push a string onto the queue to be written. /// /// The string to be written to the TCP _socket. public void Enqueue(string entry) { _eventQueue.Enqueue(entry); } } /// /// TcpConnectionPolicy implementation that tries to reconnect after /// increasingly long intervals. /// /// /// The intervals double every time, starting from 0s, 1s, 2s, 4s, ... /// until 10 minutes between connections, when it plateaus and does /// not increase the interval length any further. /// public class ExponentialBackoffTcpReconnectionPolicy { private readonly int ceiling = 10 * 60; // 10 minutes in seconds public async Task ConnectAsync(Func> connect, Uri host, CancellationToken cancellationToken) { int delay = 1; // in seconds while (!cancellationToken.IsCancellationRequested) { try { //Log.Debug("Attempting to connect to TCP endpoint {host} after delay of {delay} seconds", host, delay); return await connect(host); } catch (SocketException) { } // If this is cancelled via the cancellationToken instead of // completing its delay, the next while-loop test will fail, // the loop will terminate, and the method will return null // with no additional connection attempts. await Task.Delay(delay * 1000, cancellationToken); // The nth delay is min(10 minutes, 2^n - 1 seconds). delay = Math.Min((delay + 1) * 2 - 1, ceiling); } // cancellationToken has been cancelled. return null; } } /// /// A queue with a maximum size. When the queue is at its maximum size /// and a new item is queued, the oldest item in the queue is dropped. /// /// internal class FixedSizeQueue { private int Size { get; } private readonly IProgress _progress = new Progress(); private bool IsCompleted { get; set; } private readonly BlockingCollection _collection = new BlockingCollection(); public FixedSizeQueue(int size) { Size = size; IsCompleted = false; } public void Enqueue(T obj) { lock (this) { if (IsCompleted) { throw new InvalidOperationException("Tried to add an item to a completed queue."); } _collection.Add(obj); while (_collection.Count > Size) { _collection.Take(); } _progress.Report(true); } } public void CompleteAdding() { lock (this) { IsCompleted = true; } } public T Dequeue(CancellationToken cancellationToken) { return _collection.Take(cancellationToken); } public T Dequeue() { return _collection.Take(); } public decimal Count => _collection.Count; } }