Source code for omegaml.mixins.mdf.iotools

  1"""
  2omegaml write and read large CSV files from/to remote or local file paths
  3Supports s3, hdfs, http/s, sftp, scp, ssh
  4
  5Usage:
  6    # read
  7    om.read_csv('protocol://path/test.csv', 'foobar')
  8
  9    # write
 10    mdf = om.datasets.getl('tripdata')
 11    mdf.to_csv('protocol://path/test.csv')
 12
 13    -- note path can be local, s3, http/s, scp, ftp, ssh (anything that smart_open supports,
 14       see https://github.com/RaRe-Technologies/smart_open
 15
 16(c) 2019, 2020 omegaml.io by oneseven GmbH, Zurich, Switzerland
 17"""
 18import pandas as pd
 19from tqdm import tqdm
 20
 21try:
 22    from smart_open import open as open_file
 23except:
 24    open_file = open
 25
 26
 27class IOToolsMDFMixin:
 28    def to_csv(mdf, csvfn, chunksize=10000, maxobs=None, apply=None, mode='w', open_kwargs=None, **kwargs):
 29        """
 30        write MDataframe to s3, hdfs, http/s, sftp, scp, ssh, write to om.datasets
 31
 32        Usage:
 33            mdf = om.datasets.getl('name')
 34            mdf.to_csv('/path/filename.csv')
 35        """
 36        df_iter = mdf.iterchunks(chunksize=chunksize)
 37        _chunked_to_csv(df_iter, csvfn, mode, apply, open_kwargs=open_kwargs, **kwargs)
 38
 39
[docs] 40class IOToolsStoreMixin:
[docs] 41 def read_csv(self, csvfn, name, chunksize=10000, append=False, apply=None, mode='r', 42 open_kwargs=None, **kwargs): 43 """ 44 read large files from s3, hdfs, http/s, sftp, scp, ssh, write to om.datasets 45 46 Usage: 47 48 To insert a local csv file into a dataset:: 49 50 om.datasets.read_csv('/path/filename.csv', 'dataset-name') 51 52 To insert a file stored in any of the supported locations specify 53 its fully qualified location and filename. The specific format must 54 be specified according to the `smart_open`_ library:: 55 56 om.datasets.read_csv('https://...', 'dataset-name') 57 om.datasets.read_csv('s3://...', 'dataset-name') 58 om.datasets.read_csv('hdfs://...', 'dataset-name') 59 om.datasets.read_csv('sftp://...', 'dataset-name') 60 om.datasets.read_csv('scp://...', 'dataset-name') 61 om.datasets.read_csv('ssh://...', 'dataset-name') 62 63 Optionally define a function to receives each chunk as a dataframe 64 and apply further processing (e.g. transformations, filtering):: 65 66 def process(df): 67 # apply any processing to df 68 return df 69 70 om.datasets.read_csv(...., apply=process) 71 72 Args: 73 csvfn (str): the fully qualified path and name of the csv file, 74 according to the `smart_open`_ library 75 chunksize (int): the size of each chunk processed before writing 76 to the dataset 77 append (bool): if True, appends to the dataset. defaults to False 78 apply (callable): if specified, each chunk is forwarded as a 79 DataFrame and the returned result is inserted to the dataset. 80 Use this for transformations or filtering 81 mode (str): file open mode, defaults to r 82 open_kwargs (dict): additional kwargs to `smart_open`_ 83 **kwargs: additional kwargs are passed to ``pandas.read_csv`` 84 85 Returns: 86 MDataFrame 87 88 See Also: 89 90 * `smart_open` https://github.com/RaRe-Technologies/smart_open 91 * `pandas.read_csv` 92 93 .. _smart_open: https://github.com/RaRe-Technologies/smart_open 94 """ 95 store = self 96 open_kwargs = open_kwargs or {} 97 with open_file(csvfn, mode=mode, **open_kwargs) as fin: 98 it = pd.read_csv(fin, chunksize=chunksize, iterator=True, **kwargs) 99 pbar = tqdm(it) 100 try: 101 for i, chunkdf in enumerate(pbar): 102 if apply: 103 result = apply(chunkdf) 104 chunkdf = chunkdf if result is None else result 105 store.put(chunkdf, name, append=(i > 0) or append) 106 finally: 107 pbar.close() 108 return store.getl(name)
109
[docs] 110 def to_csv(self, name, csvfn, chunksize=10000, apply=None, mode='w', open_kwargs=None, **kwargs): 111 """ write any dataframe to s3, hdfs, http/s, sftp, scp, ssh 112 113 Usage: 114 115 To write a dataframe:: 116 117 om.datasets.write_csv('dataframe-dataset', '/path/to/filename') 118 119 To write a large dataframe in chunks: 120 121 om.datasets.write_csv('dataframe-dataset', '/path/to/filename', 122 chunksize=100) 123 124 Args: 125 name (str): the name of the dataframe dataset 126 csvfn (str): the fully qualified path and name of the csv file, 127 according to the `smart_open`_ library 128 chunksize (int): the size of each chunk processed before writing 129 to the file 130 apply (callable): if specified, each chunk is forwarded as a 131 DataFrame and the returned result is written to the file. 132 Use this for transformations or filtering 133 mode (str): file open mode, defaults to w 134 open_kwargs (dict): additional kwargs to `smart_open`_ 135 **kwargs: additional kwargs are passed to ``pandas.to_csv`` 136 137 See Also: 138 139 * ``pandas.to_csv`` 140 """ 141 df_iter = self.get(name, chunksize=chunksize) 142 _chunked_to_csv(df_iter, csvfn, mode, apply, open_kwargs=open_kwargs, **kwargs)
143 144 145def _chunked_to_csv(df_iter, csvfn, mode, apply, open_kwargs=None, **kwargs): 146 open_kwargs = open_kwargs or {} 147 with open_file(csvfn, mode, **open_kwargs) as fout: 148 for i, chunkdf in tqdm(enumerate(df_iter)): 149 if apply: 150 result = apply(chunkdf) 151 chunkdf = chunkdf if result is None else result 152 chunkdf.to_csv(fout, **kwargs)