2017년 8월 29일 화요일

Developing ElliottBrowser for Quandl.Com - 3


Downloading / Storing Chart Data Incrementally - 3


    In this blog post, let's talk about the database usage of ElliottBrowser. As noted before, it's necessary to use a local data store to enable the idea of 'incremental download', or 'fresh-download first and append later'.

    Then, what would we use as a local data store?  It may be that the most preferable choice is to use a RDBMS such as MySQL, PostgreSQL, or to use a NoSQL database. But because the data structure under our consideration, actually time series in stock charts, is too simple and formal, using a database system may be too much. Even so, we cannot store our data to some local files while caring about all IO and some additional processing.

    Assuming we will use a database system, but not limited to a specific one, and let's talk about what we need more in that case. The main point of view is how to reduce the dependency to a specific database while allowing as many preferences to databases as possible to developers.

▷ Functional Design for Database Accesses


    1. ElliottBrowser adopts a wrapper class, 'WnFDbConnectionWrapper', so that we can respond easily to database changes. ※ For those who want to use databases other than MS SQL Server Compact, it's enough to implement an subclass of the wrapper class.

    2. We keep the wrapper class most simple. Its functions are as follows
       - Creating a wrapper object per database (F.Req.1)
       - Getting the last candlestick per chart data, if any (F.Req.2)
       - Inserting / appending / getting candlesticks per chart data (F.Req.3~5)


▷ Some Technical Specifics


    1. Abstracting database access

public enum WnF_DBType
{
    SQLCE = 0,
    MySQL = 1,
    SQLite = 2
}

public class WnFDbConnectionWrapper : IDisposable
{
    public WnFDbConnectionWrapper(WnF_DBType k, string s)
    {
        if (string.IsNullOrEmpty(s))
            throw new ArgumentException("Connection String Empty", "connStr");
        connStr = s;
        type = k;
    }

    // F.Req.1
    public static WnFDbConnectionWrapper GetWrapper(WnF_DBType k, string s)
    {
        WnFDbConnectionWrapper wrpper = default(WnFDbConnectionWrapper);

        if (k == WnF_DBType.SQLCE)
        {
    retry:
            wrpper = new SqlCeWrapper(k, s);
            if (!wrpper.InitConnection())
            {
                wrpper = null; Thread.Sleep(10);
                goto retry;
            }
        }
        else
            throw new NotImplementedException();

        return wrpper;
    }

    #region " IDisposable Support "
    /// Keep track of when the object is disposed. 
    protected bool disposed = false;
    /// This method disposes the base object's resources. 
    protected virtual void Dispose(bool disposing)
    {
        if (!this.disposed)
        {
            if (disposing)
            {
                /// Insert code to free managed resources.
                if ((conn != null)) conn.Close();
            }
            /// Insert code to free unmanaged resources. 
        }
        this.disposed = true;
    }

    /// Do not change or add Overridable to these methods. 
    /// Put cleanup code in Dispose(ByVal disposing As Boolean). 
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
    #endregion

    public DbConnection Connection
    {
        get { return conn; }
    }

    public bool InitConnection()
    {
        SetupConnection();
        return (conn != null);
    }

    // F.Req.2
    public StockOHLCV CheckTable(int p, string s, out string sn)
    {
        StockOHLCV lastC = default(StockOHLCV);
        sn = TableName(p, s);
        if (Exists(sn))
            lastC = LastRow(sn);
        else
            CreateTable(sn, p);
        return lastC;
    }

    // F.Req.3
    public virtual void InsertTable(DataTable dt) { }

    // F.Req.4
    public int AppendTable(DataTable dt)
    {
        if (!DeleteRow(dt.Rows[0])) throw new Exception("DeleteRow failed");
        InsertTable(dt);
        return dt.Rows.Count;
    }

    // F.Req.5
    public virtual int FillRows(int p, string s, int n, ref DataTable dt_in)
    {
        return 0;
    }


    protected WnF_DBType type;
    protected string connStr;
    protected DbConnection conn;


    protected virtual void SetupConnection() { }

    protected virtual bool Exists(string cname)
    {
        return false;
    }

    protected virtual StockOHLCV LastRow(string cname)
    {
        return default(StockOHLCV);
    }

    protected virtual void CreateTable(string cname) { }

    protected string TableName(int p, string s)
    {
        string t = null;
        int i = 0;
        if (int.TryParse(s, out i))
            t = "_" + s;
        else
            t = s;
        t = t.Replace(".", "D_").Replace("#", "_S_").Replace("@", "_AT_").Replace("-", "_DS_") + "_" + p;
        return t;
    }

    protected bool DeleteRow(DataRow dr)
    {
        bool b = true;
        string sql = "delete from " + dr.Table.TableName + " where DateTime Like '"
                                    + ((string)dr["DateTime"]).Substring(0, 10) + "%'";

        try
        {
            using (DbCommand cmd = conn.CreateCommand())
            {
                cmd.CommandText = sql;
                cmd.ExecuteNonQuery();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("[WnFDbConnectionWrapper.DeleteRow()] Exception: " + ex.Message);
            b = false;
        }
        return b;
    }
}


    
    2. Wrapper for MS SQL Server Compact

public class SqlCeWrapper : WnFDbConnectionWrapper
{
    public SqlCeWrapper(WnF_DBType k, string s) : base(k, s)
    {
    }

    protected override void SetupConnection()
    {
        try
        {
            string strds = connStr.Split(';')[0];
            strds = strds.Split('=')[1];
            if (!File.Exists(strds))
            {
                SqlCeEngine engine = new SqlCeEngine(connStr);
                engine.CreateDatabase();
            }
            conn = new SqlCeConnection(connStr);
            conn.Open();
        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception at SqlCeWrapper.SetupConn()\r\n" + ex.Message);
            if (conn != null)
            {
                conn.Close();
                conn = null;
            }
        }
    }

    protected override bool Exists(string cname)
    {
        bool b = false;
        if (cname != string.Empty)
        {
            SqlCeCommand mycommand = ((SqlCeConnection)conn).CreateCommand();
            string sql = "SELECT Count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME='" + cname + "'";
            mycommand.CommandText = sql;
            b = (Convert.ToInt32(mycommand.ExecuteScalar()) > 0);
        }
        return b;
    }

    protected override StockOHLCV LastRow(string cname)
    {
        StockOHLCV lastC = default(StockOHLCV);
        string sql = "select * from " + cname + " where DateTime IN (select MAX(DateTime) from " + cname + ")";
        SqlCeCommand mycommand = ((SqlCeConnection)conn).CreateCommand();
        mycommand.CommandText = sql;
        try
        {
            lastC = new StockOHLCV(Convert.ToString(mycommand.ExecuteScalar()), 0, 0, 0, 0, 0);
        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception at SqlCeWrapper.LastItem()\r\n" + ex.Message);
        }
        return lastC;
    }

    public override void InsertTable(System.Data.DataTable dt)
    {
        using (SqlCeCommand cmd = new SqlCeCommand())
        {
            cmd.Connection = (SqlCeConnection)conn;
            cmd.CommandText = dt.TableName;
            cmd.CommandType = CommandType.TableDirect;

            using (SqlCeResultSet rs = cmd.ExecuteResultSet(ResultSetOptions.Updatable | ResultSetOptions.Scrollable))
            {
                foreach (DataRow r in dt.Rows)
                {
                    SqlCeUpdatableRecord record = rs.CreateRecord();
                    foreach (DataColumn col in dt.Columns)
                        record.SetValue(dt.Columns.IndexOf(col), r[col]);
                    rs.Insert(record);
                }
            }
        }
    }

    private string _create_fields()
    {
        return " (DateTime NVARCHAR(19) PRIMARY KEY, [Open] REAL, High REAL, Low REAL, [Close] REAL, Volume REAL)";
    }

    protected override void CreateTable(string cname)
    {
        string sql = "create table " + cname + _create_fields();
        SqlCeCommand mycommand = ((SqlCeConnection)conn).CreateCommand();
        int rcnt = 0;

    ReCreateSqlCe:
        try
        {
            using (SqlCeTransaction trn = ((SqlCeConnection)conn).BeginTransaction())
            {
                mycommand.CommandText = sql;
                mycommand.Transaction = trn;
    RetrySqlCe:
                try
                {
                    mycommand.ExecuteNonQuery();
                    trn.Commit();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Exception at SqlCeWrapper.CreateTable(), Retry count " + rcnt + "\r\n" + ex.Message);
                    if ((trn != null)) trn.Rollback();
                    if (rcnt < 3)
                    {
                        rcnt += 1;
                        Thread.Sleep(2000);
                        goto RetrySqlCe;
                    }
                }
            }
        }
        catch (SqlCeException ex)
        {
            if (Strings.InStr(ex.Message, "locked") > 1)
            {
                Thread.Sleep(1000);
                goto ReCreateSqlCe;
            }
        }
    }

    public override int FillRows(int p, string s, int n, ref DataTable dt_in)
    {
        int ccnt = 0;
        string sql = "select TOP (" + n + ") * from " + s + " order by DateTime desc";
        SqlCeCommand mycommand = ((SqlCeConnection)conn).CreateCommand();
        mycommand.CommandText = sql;

        SqlCeDataAdapter da = new SqlCeDataAdapter(mycommand);
        DataColumn col = new DataColumn("row_num", typeof(Int32));
        col.AutoIncrement = true;
        col.AutoIncrementSeed = 0;
        dt_in.Columns.Add(col);

        DataTable dt = new DataTable();
        DataTableReader dtReader = default(DataTableReader);

        try
        {
            ccnt = da.Fill(dt);
            if (ccnt > 0)
            {
                dt = dt.Select(string.Empty, "DateTime Asc").CopyToDataTable();
                dtReader = new DataTableReader(dt);

                dt_in.BeginLoadData();
                dt_in.Load(dtReader);
                dt_in.EndLoadData();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception at SqlCeWrapper.FillCandles()\r\n" + ex.Message);
        }

        return ccnt;
    }

}



댓글 없음:

댓글 쓰기