Source code for omegaml.mixins.mdf.iotools
1"""
2omegaml write and read large CSV files from/to remote or local file paths
3Supports s3, hdfs, http/s, sftp, scp, ssh
4
5Usage:
6 # read
7 om.read_csv('protocol://path/test.csv', 'foobar')
8
9 # write
10 mdf = om.datasets.getl('tripdata')
11 mdf.to_csv('protocol://path/test.csv')
12
13 -- note path can be local, s3, http/s, scp, ftp, ssh (anything that smart_open supports,
14 see https://github.com/RaRe-Technologies/smart_open
15
16(c) 2019, 2020 omegaml.io by oneseven GmbH, Zurich, Switzerland
17"""
18import pandas as pd
19from tqdm import tqdm
20
21try:
22 from smart_open import open as open_file
23except:
24 open_file = open
25
26
27class IOToolsMDFMixin:
28 def to_csv(mdf, csvfn, chunksize=10000, maxobs=None, apply=None, mode='w', open_kwargs=None, **kwargs):
29 """
30 write MDataframe to s3, hdfs, http/s, sftp, scp, ssh, write to om.datasets
31
32 Usage:
33 mdf = om.datasets.getl('name')
34 mdf.to_csv('/path/filename.csv')
35 """
36 df_iter = mdf.iterchunks(chunksize=chunksize)
37 _chunked_to_csv(df_iter, csvfn, mode, apply, open_kwargs=open_kwargs, **kwargs)
38
39
[docs]
40class IOToolsStoreMixin:
[docs]
41 def read_csv(self, csvfn, name, chunksize=10000, append=False, apply=None, mode='r',
42 open_kwargs=None, **kwargs):
43 """
44 read large files from s3, hdfs, http/s, sftp, scp, ssh, write to om.datasets
45
46 Usage:
47
48 To insert a local csv file into a dataset::
49
50 om.datasets.read_csv('/path/filename.csv', 'dataset-name')
51
52 To insert a file stored in any of the supported locations specify
53 its fully qualified location and filename. The specific format must
54 be specified according to the `smart_open`_ library::
55
56 om.datasets.read_csv('https://...', 'dataset-name')
57 om.datasets.read_csv('s3://...', 'dataset-name')
58 om.datasets.read_csv('hdfs://...', 'dataset-name')
59 om.datasets.read_csv('sftp://...', 'dataset-name')
60 om.datasets.read_csv('scp://...', 'dataset-name')
61 om.datasets.read_csv('ssh://...', 'dataset-name')
62
63 Optionally define a function to receives each chunk as a dataframe
64 and apply further processing (e.g. transformations, filtering)::
65
66 def process(df):
67 # apply any processing to df
68 return df
69
70 om.datasets.read_csv(...., apply=process)
71
72 Args:
73 csvfn (str): the fully qualified path and name of the csv file,
74 according to the `smart_open`_ library
75 chunksize (int): the size of each chunk processed before writing
76 to the dataset
77 append (bool): if True, appends to the dataset. defaults to False
78 apply (callable): if specified, each chunk is forwarded as a
79 DataFrame and the returned result is inserted to the dataset.
80 Use this for transformations or filtering
81 mode (str): file open mode, defaults to r
82 open_kwargs (dict): additional kwargs to `smart_open`_
83 **kwargs: additional kwargs are passed to ``pandas.read_csv``
84
85 Returns:
86 MDataFrame
87
88 See Also:
89
90 * `smart_open` https://github.com/RaRe-Technologies/smart_open
91 * `pandas.read_csv`
92
93 .. _smart_open: https://github.com/RaRe-Technologies/smart_open
94 """
95 store = self
96 open_kwargs = open_kwargs or {}
97 with open_file(csvfn, mode=mode, **open_kwargs) as fin:
98 it = pd.read_csv(fin, chunksize=chunksize, iterator=True, **kwargs)
99 pbar = tqdm(it)
100 try:
101 for i, chunkdf in enumerate(pbar):
102 if apply:
103 result = apply(chunkdf)
104 chunkdf = chunkdf if result is None else result
105 store.put(chunkdf, name, append=(i > 0) or append)
106 finally:
107 pbar.close()
108 return store.getl(name)
109
[docs]
110 def to_csv(self, name, csvfn, chunksize=10000, apply=None, mode='w', open_kwargs=None, **kwargs):
111 """ write any dataframe to s3, hdfs, http/s, sftp, scp, ssh
112
113 Usage:
114
115 To write a dataframe::
116
117 om.datasets.write_csv('dataframe-dataset', '/path/to/filename')
118
119 To write a large dataframe in chunks:
120
121 om.datasets.write_csv('dataframe-dataset', '/path/to/filename',
122 chunksize=100)
123
124 Args:
125 name (str): the name of the dataframe dataset
126 csvfn (str): the fully qualified path and name of the csv file,
127 according to the `smart_open`_ library
128 chunksize (int): the size of each chunk processed before writing
129 to the file
130 apply (callable): if specified, each chunk is forwarded as a
131 DataFrame and the returned result is written to the file.
132 Use this for transformations or filtering
133 mode (str): file open mode, defaults to w
134 open_kwargs (dict): additional kwargs to `smart_open`_
135 **kwargs: additional kwargs are passed to ``pandas.to_csv``
136
137 See Also:
138
139 * ``pandas.to_csv``
140 """
141 df_iter = self.get(name, chunksize=chunksize)
142 _chunked_to_csv(df_iter, csvfn, mode, apply, open_kwargs=open_kwargs, **kwargs)
143
144
145def _chunked_to_csv(df_iter, csvfn, mode, apply, open_kwargs=None, **kwargs):
146 open_kwargs = open_kwargs or {}
147 with open_file(csvfn, mode, **open_kwargs) as fout:
148 for i, chunkdf in tqdm(enumerate(df_iter)):
149 if apply:
150 result = apply(chunkdf)
151 chunkdf = chunkdf if result is None else result
152 chunkdf.to_csv(fout, **kwargs)