Skip to content

Commit 360229a

Browse files
author
Brian
committed
add data manager/downloader
1 parent 80218dc commit 360229a

File tree

4 files changed

+186
-65
lines changed

4 files changed

+186
-65
lines changed

pytradelib/data.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,29 @@
1+
import os
2+
from datetime import datetime
3+
from pytradelib.store import CSVStore
4+
from pytradelib.quandl.wiki import QuandlDailyWikiProvider
5+
from pytradelib.settings import DATA_DIR
16

2-
from pytradelib.utils import get_parse_symbols, csv_to_df
3-
from pytradelib.quandl import _construct_url as construct_url_quandl
4-
from pytradelib.quandl import _deconstruct_url as deconstruct_url_quandl
7+
class DataManager(object):
8+
def __init__(self, store=None, data_provider=None):
9+
self._store = store or CSVStore()
10+
self._provider = data_provider or QuandlDailyWikiProvider()
511

12+
def initialize_store(self):
13+
raise NotImplementedError
614

7-
def get_symbols_quandl(symbols, start='2010-01-01', end='2010-08-31', interval=None):
8-
if not isinstance(symbols, list):
9-
symbols = [symbols]
10-
return get_parse_symbols(symbols, start, end, interval, construct_url_quandl, deconstruct_url_quandl, csv_to_df)
15+
def update_store(self):
16+
symbols = dict([ (symbol, {'start': self._store.get_end_date(symbol),
17+
'end': datetime.now()} )\
18+
for symbol in self._store.symbols ])
19+
self._store.set_dfs(self._provider.download(symbols))
20+
21+
def analyze(self):
22+
results = self._store.analyze()
23+
filename = '%s-analysis.csv' % datetime.now().strftime('%Y-%m-%d')
24+
results.to_csv(os.path.join(DATA_DIR, filename))
25+
return results
26+
27+
if __name__ == '__main__':
28+
data_manager = DataManager(CSVStore(), QuandlDailyWikiProvider())
29+
data_manager.update_store()

pytradelib/downloader.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from __future__ import print_function
2+
3+
import urllib3.contrib.pyopenssl
4+
from pytradelib.utils import batch
5+
6+
urllib3.contrib.pyopenssl.inject_into_urllib3()
7+
8+
import requests
9+
import grequests
10+
from gevent import monkey
11+
monkey.patch_all()
12+
13+
from pytradelib.logger import logger
14+
15+
16+
class Downloader(object):
17+
def __init__(self, batch_size=100, sleep=None):
18+
self._batch_size = batch_size
19+
self._sleep = sleep
20+
21+
@property
22+
def batch_size(self):
23+
return self._batch_size
24+
25+
@batch_size.setter
26+
def batch_size(self, batch_size):
27+
self._batch_size = batch_size
28+
29+
@property
30+
def sleep(self):
31+
return self._sleep
32+
33+
@sleep.setter
34+
def sleep(self, sleep):
35+
self._sleep = sleep
36+
37+
def download(self, urls):
38+
if isinstance(urls, str):
39+
return self._download(urls)
40+
return self._bulk_download(urls)
41+
42+
def _download(self, url):
43+
logger.info('Download started: ' + url)
44+
try:
45+
r = requests.get(url)
46+
logger.info('Download completed: ' + url)
47+
if r.status_code == 200:
48+
return r.content
49+
r.raise_for_status()
50+
except requests.exceptions.Timeout as e:
51+
logger.error('Connection timed out: ' + e.__str__())
52+
except requests.exceptions.RequestException as e:
53+
logger.error('Error downloading: ' + e.__str__())
54+
return None
55+
56+
def _bulk_download(self, urls):
57+
results = []
58+
for batched_urls in batch(urls, self.batch_size, self.sleep):
59+
for r in self.__bulk_download(batched_urls):
60+
print('finished downloading ' + r.url)
61+
results.append( (r.url, r.content) )
62+
return results
63+
64+
def __bulk_download(self, urls, errors=None):
65+
errors = errors or []
66+
def exception_handler(req, ex):
67+
msg = 'Failed to download ' + req.url
68+
if isinstance(ex, requests.exceptions.Timeout):
69+
msg = 'Connection timed out: %(ex)s (%(url)s)' % {'ex': ex.__str__(), 'url': req.url}
70+
elif isinstance(ex, requests.exceptions.RequestException):
71+
msg = 'Error downloading: %(ex)s (%(url)s)' % {'ex': ex, 'url': req.url}
72+
errors.append(req.url)
73+
logger.error(msg)
74+
return grequests.map((grequests.get(url) for url in urls), exception_handler=exception_handler)

pytradelib/logger.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import logging
2+
from logging.handlers import TimedRotatingFileHandler
3+
4+
from pytradelib.settings import LOG_LEVEL, LOG_FILENAME
5+
6+
LEVELS = {
7+
'debug': logging.DEBUG,
8+
'info': logging.INFO,
9+
'warning': logging.WARNING,
10+
'error': logging.ERROR,
11+
'critical': logging.CRITICAL,
12+
}
13+
14+
logger = logging.getLogger('PyTradeLib')
15+
logger.setLevel(LEVELS.get(LOG_LEVEL, logging.WARNING))
16+
handler = TimedRotatingFileHandler(LOG_FILENAME, 'midnight')
17+
handler.setFormatter(logging.Formatter(
18+
'%(asctime)s %(levelname)s: pytradelib.%(module)s L%(lineno)s: %(message)s',
19+
'%Y-%m-%d %H:%M:%S'
20+
))
21+
logger.addHandler(handler)
22+
23+
24+
__ALL__ = ('logger',)

pytradelib/utils.py

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,77 @@
1+
from __future__ import print_function
2+
13
import os
4+
import time
5+
import pytz
26
import pandas as pd
37

4-
pd.set_option('io.hdf.default_format', 'table')
5-
6-
from pandas import HDFStore
78
from pandas.compat import StringIO, bytes_to_str
89

9-
import grequests
10-
from gevent import monkey
11-
monkey.patch_all()
12-
13-
DATA_DIR = os.environ['HOME'] + '/.pytradelib'
14-
__STORE = None
10+
import datetime as dt
11+
12+
def batch(list_, size, sleep=None):
13+
list_ = list(list_)
14+
len_ = len(list_)
15+
for i in xrange((len_ / size) + 1):
16+
start_idx = i * size
17+
end_idx = (i + 1) * size
18+
if end_idx > len_:
19+
end_idx = len_
20+
yield list_[start_idx:end_idx]
21+
if sleep:
22+
print('Sleeping for %d seconds' % sleep)
23+
time.sleep(sleep)
24+
25+
26+
def _sanitize_dates(start, end):
27+
from pandas.core.datetools import to_datetime
28+
start = to_datetime(start)
29+
end = to_datetime(end)
30+
if start is None:
31+
start = dt.datetime(2010, 1, 1)
32+
if end is None:
33+
end = dt.datetime.today()
34+
return start, end
1535

1636

17-
def _bulk_download(urls):
18-
return grequests.imap((grequests.get(url) for url in urls))
19-
20-
def get_parse_symbols(symbols, start, end, interval, symbol_to_url, url_to_symbol, data_to_df):
21-
urls = (symbol_to_url(symbol.upper(), start, end, interval) for symbol in symbols)
22-
def parse_response_to_symbol_and_df(r):
23-
return url_to_symbol(r.url), data_to_df(r.text)
24-
data = map(parse_response_to_symbol_and_df, _bulk_download(urls))
25-
bulk_persist(data)
26-
return data
27-
2837
def csv_to_df(text):
29-
rs = pd.read_csv(StringIO(bytes_to_str(text)), index_col=0,
30-
parse_dates=True, na_values='-')[::-1]
38+
df = pd.read_csv(StringIO(bytes_to_str(text)), index_col=0,
39+
parse_dates=True, infer_datetime_format=True,
40+
na_values='-')[::-1]
3141

3242
# Yahoo! Finance sometimes does this awesome thing where they
3343
# return 2 rows for the most recent business day
34-
if len(rs) > 2 and rs.index[-1] == rs.index[-2]: # pragma: no cover
35-
rs = rs[:-1]
44+
if len(df) > 2 and df.index[-1] == df.index[-2]: # pragma: no cover
45+
df = df[:-1]
3646

37-
# Get rid of unicode characters in index name.
47+
# Get rid of unicode charactedf in index name.
3848
try:
39-
rs.index.name = rs.index.name.decode('unicode_escape').encode('ascii', 'ignore')
49+
df.index.name = df.index.name.decode('unicode_escape').encode('ascii', 'ignore')
4050
except AttributeError:
4151
# Python 3 string has no decode method.
42-
rs.index.name = rs.index.name.encode('ascii', 'ignore').decode()
43-
return rs
44-
45-
def get_store():
46-
global __STORE
47-
if not __STORE:
48-
if not os.path.exists(DATA_DIR):
49-
os.mkdir(DATA_DIR)
50-
__STORE = HDFStore(DATA_DIR + '/store.hdf5')
51-
return __STORE
52-
53-
def store_path(symbol, interval):
54-
return '/symbols/%s/%s' % (symbol.upper(), interval.lower())
55-
56-
def exists(symbol, interval):
57-
store = get_store()
58-
return store_path(symbol, interval) in store.keys()
59-
60-
def persist(symbol, interval, df):
61-
store = get_store()
62-
if exists(symbol, interval):
63-
store.append(store_path(symbol, interval), df)
64-
else:
65-
store.put(store_path(symbol, interval), df)
66-
67-
def bulk_persist(data):
68-
for symbol_data, df in data:
69-
persist(symbol_data['symbol'], symbol_data['interval'], df)
70-
71-
def most_recent_datetime(symbol, interval):
72-
store = get_store()
73-
return store.get(store_path(symbol, interval)).tail(1).index[0].to_datetime()
52+
df.index.name = df.index.name.encode('ascii', 'ignore').decode()
53+
54+
column_renames = {'Adj. Open': 'Adj Open', 'Adj. High': 'Adj High',
55+
'Adj. Low': 'Adj Low', 'Adj. Close': 'Adj Close',
56+
'Adj. Volume': 'Adj Volume'}
57+
df.rename(columns=column_renames, inplace=True)
58+
return df.tz_localize(pytz.UTC)
59+
60+
61+
def percent_change(from_val, to_val):
62+
# coerce to float for decimal division
63+
diff = float(to_val) - from_val
64+
return (diff / from_val) * 100
65+
66+
67+
def crossed(value, yesterday, today, use_adjusted=True):
68+
def key(price_key):
69+
return 'Adj ' + price_key if use_adjusted else price_key
70+
crossed_over = yesterday[key('Close')] < value < today[key('Close')]
71+
crossed_under = yesterday[key('Close')] > value > today[key('Close')]
72+
return crossed_over or crossed_under
73+
74+
75+
def within_percent_of_value(price, value, percent=1):
76+
diff = percent * 0.01 * 0.5 * value
77+
return (value - diff) < price < (value + diff)

0 commit comments

Comments
 (0)