/*
 * Copyright 2014 Splunk, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"): you may
 * not use this file except in compliance with the License. You may obtain
 * a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

using Serilog;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

namespace Admin.Core.Serilog.Es
{
    /// <summary>
    /// TcpSocketWriter encapsulates queueing strings to be written to a TCP _socket
    /// and handling reconnections (according to a TcpConnectionPolicy object passed
    /// to it) when a TCP session drops.
    /// </summary>
    /// <remarks>
    /// TcpSocketWriter maintains a fixed sized queue of strings to be sent via
    /// the TCP _port and, while the _socket is open, sends them as quickly as possible.
    ///
    /// If the TCP session drops, TcpSocketWriter will stop pulling strings off the
    /// queue until it can reestablish a connection. Any SocketErrors emitted during this
    /// process will be passed as arguments to invocations of LoggingFailureHandler.
    /// If the TcpConnectionPolicy.Connect method throws an exception (in particular,
    /// TcpReconnectFailure to indicate that the policy has reached a point where it
    /// will no longer try to establish a connection) then the LoggingFailureHandler
    /// event is invoked, and no further attempt to log anything will be made.
    /// </remarks>
    public class TcpSocketWriter : IDisposable
    {
        private readonly FixedSizeQueue<string> _eventQueue;
        private readonly ExponentialBackoffTcpReconnectionPolicy _reconnectPolicy = new ExponentialBackoffTcpReconnectionPolicy();
        private readonly CancellationTokenSource _tokenSource; // Must be private or Dispose will not function properly.
        private readonly TaskCompletionSource<bool> _disposed = new TaskCompletionSource<bool>();

        private Stream _stream;

        /// <summary>
        /// Event that is invoked when reconnecting after a TCP session is dropped fails.
        /// </summary>
        public event Action<Exception> LoggingFailureHandler = ex =>
        {
            UnexpectedErrorLogger(
                ex, 
                (x, socketError) =>
                {
                    if (socketError == null)
                    {
                        //Log.Error(x, "failure inside TCP socket: {message}", x.Message);
                    }
                    else
                    {
                        //Log.Error(
                        //    x,
                        //    "failure inside TCP socket: {message} - socket error found {socketErrorCode}",
                        //    x.Message,
                        //    socketError);
                    }
                    
                });
        };

        public static void UnexpectedErrorLogger(Exception ex, Action<Exception, SocketError?> log)
        {
            SocketError? socketErrorCode = null;
            var current = ex;
            do
            {
                if (current is SocketException)
                {
                    socketErrorCode = ((SocketException) current).SocketErrorCode;
                }

                current = current.InnerException;
            } while (socketErrorCode == null && current != null);

            log(ex, socketErrorCode);
        }

        /// <summary>
        /// Construct a TCP _socket writer that writes to the given endPoint and _port.
        /// </summary>
        /// <param name="uri">Uri to open a TCP socket to.</param>
        /// <param name="maxQueueSize">The maximum number of log entries to queue before starting to drop entries.</param>
        public TcpSocketWriter(Uri uri, int maxQueueSize = 5000)
        {
            _eventQueue = new FixedSizeQueue<string>(maxQueueSize);
            _tokenSource = new CancellationTokenSource();

            Func<Uri, Task<Stream>> tryOpenSocket = async h =>
            {
                try
                {
                    TcpClient client = new TcpClient();
                    await client.ConnectAsync(uri.Host, uri.Port);
                    Stream stream = client.GetStream();
                    if (uri.Scheme.ToLower() != "tls")
                        return stream;

                    var sslStream = new SslStream(client.GetStream(), false, null, null);
                    await sslStream.AuthenticateAsClientAsync(uri.Host);
                    return sslStream;
                }
                catch (Exception e)
                {
                    LoggingFailureHandler(e);
                    throw;
                }
            };

            var threadReady = new TaskCompletionSource<bool>();

            Task queueListener = Task.Factory.StartNew(async () =>
            {
                try
                {
                    bool sslEnabled = uri.Scheme.ToLower() == "tls";
                    _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token);
                    threadReady.SetResult(true); // Signal the calling thread that we are ready.

                    string entry = null;
                    while (_stream != null) // null indicates that the thread has been cancelled and cleaned up.
                    {
                        if (_tokenSource.Token.IsCancellationRequested)
                        {
                            _eventQueue.CompleteAdding();
                            // Post-condition: no further items will be added to the queue, so there will be a finite number of items to handle.
                            while (_eventQueue.Count > 0)
                            {
                                entry = _eventQueue.Dequeue();
                                try
                                {
                                    byte[] messsage = Encoding.UTF8.GetBytes(entry);
                                    await _stream.WriteAsync(messsage, 0, messsage.Length);
                                    await _stream.FlushAsync();
                                }
                                catch (SocketException ex)
                                {
                                    LoggingFailureHandler(ex);
                                }
                            }
                            break;
                        }
                        if (entry == null)
                        {
                            entry = _eventQueue.Dequeue(_tokenSource.Token);
                        }
                        else
                        {
                            try
                            {
                                byte[] messsage = Encoding.UTF8.GetBytes(entry);
                                await _stream.WriteAsync(messsage, 0, messsage.Length);
                                await _stream.FlushAsync();
                                // No exception, it was sent
                                entry = null;
                            }
                            catch (IOException ex)
                            {
                                LoggingFailureHandler(ex);
                                _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token);
                            }
                            catch (SocketException ex)
                            {
                                LoggingFailureHandler(ex);
                                _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token);
                            }
                        }
                    }
                }
                catch (Exception e)
                {
                    LoggingFailureHandler(e);
                }
                finally
                {
                    if (_stream != null)
                    {
                        _stream.Dispose();
                    }

                    _disposed.SetResult(true);
                }
            }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
            threadReady.Task.Wait(TimeSpan.FromSeconds(5));
        }

        public void Dispose()
        {
            // The following operations are idempotent. Issue a cancellation to tell the
            // writer thread to stop the queue from accepting entries and write what it has
            // before cleaning up, then wait until that cleanup is finished.
            _tokenSource.Cancel();
            Task.Run(async () => await _disposed.Task).Wait();
        }

        /// <summary>
        /// Push a string onto the queue to be written.
        /// </summary>
        /// <param name="entry">The string to be written to the TCP _socket.</param>
        public void Enqueue(string entry)
        {
            _eventQueue.Enqueue(entry);
        }
    }

    /// <summary>
    /// TcpConnectionPolicy implementation that tries to reconnect after
    /// increasingly long intervals.
    /// </summary>
    /// <remarks>
    /// The intervals double every time, starting from 0s, 1s, 2s, 4s, ...
    /// until 10 minutes between connections, when it plateaus and does
    /// not increase the interval length any further.
    /// </remarks>
    public class ExponentialBackoffTcpReconnectionPolicy
    {
        private readonly int ceiling = 10 * 60; // 10 minutes in seconds

        public async Task<Stream> ConnectAsync(Func<Uri, Task<Stream>> connect, Uri host, CancellationToken cancellationToken)
        {
            int delay = 1; // in seconds
            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    //Log.Debug("Attempting to connect to TCP endpoint {host} after delay of {delay} seconds", host, delay);
                    return await connect(host);
                }
                catch (SocketException) { }

                // If this is cancelled via the cancellationToken instead of
                // completing its delay, the next while-loop test will fail,
                // the loop will terminate, and the method will return null
                // with no additional connection attempts.
                await Task.Delay(delay * 1000, cancellationToken);
                // The nth delay is min(10 minutes, 2^n - 1 seconds).
                delay = Math.Min((delay + 1) * 2 - 1, ceiling);
            }

            // cancellationToken has been cancelled.
            return null;
        }
    }

    /// <summary>
    /// A queue with a maximum size. When the queue is at its maximum size
    /// and a new item is queued, the oldest item in the queue is dropped.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    internal class FixedSizeQueue<T>
    {
        private int Size { get; }
        private readonly IProgress<bool> _progress = new Progress<bool>();
        private bool IsCompleted { get; set; }

        private readonly BlockingCollection<T> _collection = new BlockingCollection<T>();

        public FixedSizeQueue(int size)
        {
            Size = size;
            IsCompleted = false;
        }

        public void Enqueue(T obj)
        {
            lock (this)
            {
                if (IsCompleted)
                {
                    throw new InvalidOperationException("Tried to add an item to a completed queue.");
                }

                _collection.Add(obj);

                while (_collection.Count > Size)
                {
                    _collection.Take();
                }
                _progress.Report(true);
            }
        }

        public void CompleteAdding()
        {
            lock (this)
            {
                IsCompleted = true;
            }
        }

        public T Dequeue(CancellationToken cancellationToken)
        {
            return _collection.Take(cancellationToken);
        }

        public T Dequeue()
        {
            return _collection.Take();
        }


        public decimal Count => _collection.Count;
    }
}